Skip to content

Chore(cache): refactor/fix s3 #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: pr

on:
pull_request:

concurrency:
group: pr-${{ github.event.number }}
cancel-in-progress: true

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: go.mod
- uses: docker://morphy/revive-action:v2

test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: go.mod
- run: go test -v ./...
151 changes: 86 additions & 65 deletions cmd/bot/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// package main is the entry point for this application
package main

import (
Expand All @@ -14,34 +15,101 @@ import (
bk "github.com/tailscale/go-bluesky"
"github.com/till/golangoss-bluesky/internal/bluesky"
"github.com/till/golangoss-bluesky/internal/content"
"github.com/till/golangoss-bluesky/internal/utils"
"github.com/urfave/cli/v2"
)

var (
blueskyHandle string = "[email protected]"
blueskyAppKey string = ""

cacheBucket string = "golangoss-cache-bucket"

ctx context.Context
blueskyHandle = "[email protected]"
blueskyAppKey = ""

// for cache
awsEndpoint string = ""
awsAccessKeyId string = ""
awsSecretKey string = ""
awsEndpoint = ""
awsAccessKeyID = ""
awsSecretKey = ""
cacheBucket = "golangoss-cache-bucket"

// for github crawling
githubToken string = ""
githubToken = ""

checkInterval time.Duration = 15 * time.Minute
// How long to wait before retrying after a connection failure
reconnectDelay time.Duration = 2 * time.Minute
)

func init() {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
})))
}

// connectBluesky establishes a connection to Bluesky and logs in
func connectBluesky(ctx context.Context) (*bk.Client, error) {
client, err := bk.Dial(ctx, bk.ServerBskySocial)
if err != nil {
return nil, fmt.Errorf("failed to open connection: %v", err)
}

if err := client.Login(ctx, blueskyHandle, blueskyAppKey); err != nil {
client.Close()
switch {
case errors.Is(err, bk.ErrMasterCredentials):
return nil, fmt.Errorf("you're not allowed to use your full-access credentials, please create an appkey")
case errors.Is(err, bk.ErrLoginUnauthorized):
return nil, fmt.Errorf("username of application password seems incorrect, please double check")
default:
return nil, fmt.Errorf("login failed: %v", err)
}
}

ctx = context.Background()
return client, nil
}

// runWithReconnect attempts to run the bot with automatic reconnection on failure
func runWithReconnect(ctx context.Context, mc *minio.Client) error {
for {
client, err := connectBluesky(ctx)
if err != nil {
slog.Error("failed to connect to Bluesky", "error", err)
slog.Info("retrying connection", "delay", reconnectDelay)
time.Sleep(reconnectDelay)
continue
}

c := bluesky.Client{
Client: client,
}

cacheClient := content.NewCacheClientS3(ctx, mc, cacheBucket)

// Initialize and start the cleanup handler
cleanup := content.NewS3Cleanup(mc, cacheBucket)
cleanup.Start(ctx)
defer cleanup.Stop()

if err := content.Start(githubToken, cacheClient); err != nil {
slog.Error("failed to start service", "error", err)
client.Close()
time.Sleep(reconnectDelay)
continue
}

// Run the main loop
for {
slog.DebugContext(ctx, "checking...")
if err := content.Do(ctx, c); err != nil {
if !errors.Is(err, content.ErrCouldNotContent) {
slog.Error("error during content check", "error", err)
client.Close()
time.Sleep(reconnectDelay)
break
}
slog.DebugContext(ctx, "backing off...")
}

time.Sleep(checkInterval)
}
}
}

func main() {
Expand Down Expand Up @@ -71,7 +139,7 @@ func main() {
Name: "aws-access-key-id",
EnvVars: []string{"AWS_ACCESS_KEY_ID"},
Required: true,
Destination: &awsAccessKeyId,
Destination: &awsAccessKeyID,
},
&cli.StringFlag{
Name: "aws-secret-key",
Expand All @@ -88,73 +156,26 @@ func main() {
},

Action: func(cCtx *cli.Context) error {
// FIXME: run this in a control loop; or we crash the app
client, err := bk.Dial(ctx, bk.ServerBskySocial)
if err != nil {
return fmt.Errorf("failed to open connection: %v", err)
}
defer client.Close()

if err := client.Login(ctx, blueskyHandle, blueskyAppKey); err != nil {
switch {
case errors.Is(err, bk.ErrMasterCredentials):
return fmt.Errorf("you're not allowed to use your full-access credentials, please create an appkey")
case errors.Is(err, bk.ErrLoginUnauthorized):
return fmt.Errorf("username of application password seems incorrect, please double check")
default:
return fmt.Errorf("something else went wrong, please look at the returned error")
}
}

// init s3 client
// Initialize S3 client
mc, err := minio.New(awsEndpoint, &minio.Options{
Creds: credentials.NewStaticV4(awsAccessKeyId, awsSecretKey, ""),
Creds: credentials.NewStaticV4(awsAccessKeyID, awsSecretKey, ""),
Secure: true,
})
if err != nil {
return fmt.Errorf("failed to initialize minio client: %v", err)
}

// ensure the bucket exists
if err := mc.MakeBucket(ctx, cacheBucket, minio.MakeBucketOptions{}); err != nil {
// Ensure the bucket exists
if err := mc.MakeBucket(cCtx.Context, cacheBucket, minio.MakeBucketOptions{}); err != nil {
return fmt.Errorf("failed to create bucket: %v", err)
}

c := bluesky.Client{
Client: client,
}

cacheClient := &content.CacheClientS3{
MC: mc,
Bucket: cacheBucket,
CTX: ctx,
}

if err := content.Start(githubToken, cacheClient); err != nil {
return fmt.Errorf("failed to start service: %v", err)
}

var runErr error

for {
slog.DebugContext(ctx, "checking...")
if err := content.Do(ctx, c); err != nil {
if !errors.Is(err, content.ErrCouldNotContent) {
runErr = err
break
}
slog.DebugContext(ctx, "backing off...")
}

time.Sleep(checkInterval)
}
return runErr
return runWithReconnect(cCtx.Context, mc)
},
}

if err := bot.Run(os.Args); err != nil {
slog.ErrorContext(ctx, err.Error())
utils.LogError(err)
os.Exit(1)
}

}
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/till/golangoss-bluesky

go 1.23.0

toolchain go1.24.1
go 1.24.1

require (
github.com/go-redis/redis/v8 v8.11.5
Expand Down
3 changes: 2 additions & 1 deletion internal/bluesky/bluesky.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
bk "github.com/tailscale/go-bluesky"
)

// Client wraps the official bluesky sdk
type Client struct {
Client *bk.Client
}
Expand Down Expand Up @@ -51,7 +52,7 @@ func PostRecord(title, description, url, author, stargazers, hashtags string) *b
text += "\n\n" + hashtags
}

var startRepoURL int64 = 0
var startRepoURL int64

facets := []*bsky.RichtextFacet{}
facets = append(facets, addFacet(
Expand Down
2 changes: 2 additions & 0 deletions internal/bluesky/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package bluesky wraps the official bluesky sdk
package bluesky
39 changes: 0 additions & 39 deletions internal/content/cache.go

This file was deleted.

10 changes: 8 additions & 2 deletions internal/content/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,25 @@ package content
import (
"context"
"errors"
"fmt"
"log/slog"
"strings"

"github.com/ezeoleaf/larry/cache"
"github.com/ezeoleaf/larry/config"
"github.com/ezeoleaf/larry/provider/github"
"github.com/till/golangoss-bluesky/internal/bluesky"
"github.com/till/golangoss-bluesky/internal/utils"
)

var (
provider github.Provider
provider github.Provider

// ErrCouldNotContent is returned when content cannot be fetched
ErrCouldNotContent = errors.New("could not get content")
)

// Start bootstraps the content provider
func Start(token string, cacheClient cache.Client) error {
cfg := config.Config{
Language: "go",
Expand All @@ -26,10 +31,11 @@ func Start(token string, cacheClient cache.Client) error {
return nil
}

// Do gets content from the provider and posts it to bluesky
func Do(ctx context.Context, c bluesky.Client) error {
p, err := provider.GetContentToPublish()
if err != nil {
slog.Error("error fetching content", slog.Any("err", err))
utils.LogError(fmt.Errorf("error fetching content: %w", err))
return ErrCouldNotContent
}

Expand Down
2 changes: 2 additions & 0 deletions internal/content/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package content provides a custom cache implementation
package content
Loading