Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add new option --duration, --since and --until #49

Merged
merged 5 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions cmd/s3s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"os"
"os/signal"
"time"

"github.com/dustin/go-humanize"
"github.com/koluku/s3s"
Expand Down Expand Up @@ -38,6 +39,12 @@ var (
isALBLogs bool
isCFLogs bool

duration time.Duration
since time.Time
cliSince cli.Timestamp
until time.Time
cliUntil cli.Timestamp

// command option
threadCount int
maxRetries int
Expand Down Expand Up @@ -103,6 +110,25 @@ func main() {
Usage: "",
Destination: &isCFLogs,
},
&cli.DurationFlag{
Name: "duration",
Usage: `from current time if alb or cf (ex: "2h3m")`,
Destination: &duration,
},
&cli.TimestampFlag{
Name: "since",
Usage: `end at if alb or cf (ex: "2006-01-02 15:04:05")`,
Layout: "2006-01-02 15:04:05",
Timezone: time.UTC,
Destination: &cliSince,
},
&cli.TimestampFlag{
Name: "until",
Usage: `start at if alb or cf (ex: "2006-01-02 15:04:05")`,
Layout: "2006-01-02 15:04:05",
Timezone: time.UTC,
Destination: &cliUntil,
},
&cli.IntFlag{
Name: "thread-count",
Aliases: []string{"t, thread_count"},
Expand Down Expand Up @@ -138,6 +164,13 @@ func main() {
},
},
Action: func(c *cli.Context) error {
if cliSince.Value() != nil {
since = *cliSince.Value()
}
if cliUntil.Value() != nil {
until = *cliUntil.Value()
}

if err := cmd(c.Context, c.Args().Slice()); err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -188,6 +221,9 @@ func cmd(ctx context.Context, paths []string) error {
queryInfo := &s3s.QueryInfo{
IsCountMode: isCount,
}
keyInfo := &s3s.KeyInfo{
KeyType: s3s.KeyTypeNone,
}
switch {
case isCSV:
queryInfo.FormatType = s3s.FormatTypeCSV
Expand All @@ -197,23 +233,39 @@ func cmd(ctx context.Context, paths []string) error {
queryInfo.FormatType = s3s.FormatTypeALBLogs
queryInfo.FieldDelimiter = " "
queryInfo.RecordDelimiter = "\n"
keyInfo.KeyType = s3s.KeyTypeALB
if duration != 0 {
keyInfo.Since = time.Now().UTC().Add(duration * -1)
} else {
keyInfo.Since = since
keyInfo.Until = until
}
case isCFLogs:
queryInfo.FormatType = s3s.FormatTypeCFLogs
queryInfo.FieldDelimiter = "\t"
queryInfo.RecordDelimiter = "\n"
keyInfo.KeyType = s3s.KeyTypeCF
if duration != 0 {
keyInfo.Since = time.Now().UTC().Add(duration * -1)
} else {
keyInfo.Since = since
keyInfo.Until = until
}
default:
queryInfo.FormatType = s3s.FormatTypeJSON
}

if isDryRun {
scanByte, count, err := app.DryRun(ctx, paths, queryStr, queryInfo)
scanByte, count, err := app.DryRun(ctx, paths, keyInfo, queryStr, queryInfo)
if err != nil {
return errors.WithStack(err)
}
fmt.Printf("all scan byte: %s\n", humanize.Bytes(uint64(scanByte)))
fmt.Printf("file count: %s\n", humanize.Comma(int64(count)))
} else {
app.Run(ctx, paths, queryStr, queryInfo)
if err := app.Run(ctx, paths, keyInfo, queryStr, queryInfo); err != nil {
return errors.WithStack(err)
}
}

// Finalize
Expand Down
113 changes: 102 additions & 11 deletions prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ package s3s

import (
"context"
"regexp"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/pkg/errors"
)

const (
ErrTimeParseFailed = "time parce failed"
)

func (app *App) GetS3Dir(ctx context.Context, bucket string, prefix string) ([]string, error) {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Expand Down Expand Up @@ -38,33 +44,118 @@ func (app *App) GetS3Dir(ctx context.Context, bucket string, prefix string) ([]s
return s3Keys, nil
}

type KeyType int

const (
KeyTypeNone KeyType = iota
KeyTypeALB
KeyTypeCF
)

type KeyInfo struct {
KeyType KeyType
Since time.Time
Until time.Time
}

type ObjectInfo struct {
Bucket string
Key string
Size int64
}

func (app *App) GetS3Keys(ctx context.Context, sender chan<- ObjectInfo, bucket string, prefix string) error {
func (app *App) GetS3OneKey(ctx context.Context, bucket string, prefix string) (*ObjectInfo, error) {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
MaxKeys: 1,
}
output, err := app.s3.ListObjectsV2(ctx, input)
if err != nil {
return nil, errors.WithStack(err)
}

return &ObjectInfo{
Bucket: bucket,
Key: *output.Contents[0].Key,
Size: output.Contents[0].Size,
}, nil
}

func (app *App) GetS3Keys(ctx context.Context, sender chan<- ObjectInfo, bucket string, prefix string, info *KeyInfo) error {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
}
pagenator := s3.NewListObjectsV2Paginator(app.s3, input)

for pagenator.HasMorePages() {
output, err := pagenator.NextPage(ctx)
if err != nil {
return errors.WithStack(err)
switch info.KeyType {
case KeyTypeALB:
for pagenator.HasMorePages() {
output, err := pagenator.NextPage(ctx)
if err != nil {
return errors.WithStack(err)
}
for i := range output.Contents {
if isTimeZeroRange(info.Since, info.Until) {
sender <- ObjectInfo{
Bucket: bucket,
Key: *output.Contents[i].Key,
Size: output.Contents[i].Size,
}
continue
}
isWithin, err := isTimeWithinWhenALB(*output.Contents[i].Key, info.Since, info.Until)
if err != nil {
return errors.WithStack(err)
}
if isWithin {
sender <- ObjectInfo{
Bucket: bucket,
Key: *output.Contents[i].Key,
Size: output.Contents[i].Size,
}
continue
}
}
}

for i := range output.Contents {
sender <- ObjectInfo{
Bucket: bucket,
Key: *output.Contents[i].Key,
Size: output.Contents[i].Size,
default:
for pagenator.HasMorePages() {
output, err := pagenator.NextPage(ctx)
if err != nil {
return errors.WithStack(err)
}
for i := range output.Contents {
sender <- ObjectInfo{
Bucket: bucket,
Key: *output.Contents[i].Key,
Size: output.Contents[i].Size,
}
}
}
}

return nil
}

func isTimeZeroRange(since time.Time, until time.Time) bool {
return since.IsZero() && until.IsZero()
}

func isTimeWithinWhenALB(key string, since time.Time, until time.Time) (bool, error) {
rep := regexp.MustCompile(`_\d{8}T\d{4}Z_`)
timeStr := rep.FindString(key)

t, err := time.Parse("_20060102T1504Z_", timeStr)
if err != nil {
return false, errors.Wrap(err, ErrTimeParseFailed)
}

if !since.IsZero() && t.Before(since) {
return false, nil
}
if !until.IsZero() && t.After(until) {
return false, nil
}
return true, nil
}
Loading