Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deprecate output-http non-compatible clients #833

Merged
merged 7 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
530 changes: 0 additions & 530 deletions http_client.go

This file was deleted.

532 changes: 0 additions & 532 deletions http_client_test.go

This file was deleted.

37 changes: 17 additions & 20 deletions http_prettifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"compress/gzip"
"fmt"
"io/ioutil"
"net/http/httputil"
"strconv"
Expand All @@ -15,6 +16,13 @@ func prettifyHTTP(p []byte) []byte {
head := p[:headSize]
body := p[headSize:]

tEnc := bytes.Equal(proto.Header(body, []byte("Transfer-Encoding")), []byte("chunked"))
cEnc := bytes.Equal(proto.Header(body, []byte("Content-Encoding")), []byte("gzip"))

if !(tEnc || cEnc) {
return p
}

headersPos := proto.MIMEHeadersEndPos(body)

if headersPos < 5 || headersPos > len(body) {
Expand All @@ -24,23 +32,8 @@ func prettifyHTTP(p []byte) []byte {
headers := body[:headersPos]
content := body[headersPos:]

var tEnc, cEnc []byte
proto.ParseHeaders([][]byte{headers}, func(header, value []byte) {
if bytes.EqualFold(header, []byte("Transfer-Encoding")) {
tEnc = value
}

if bytes.EqualFold(header, []byte("Content-Encoding")) {
cEnc = value
}
})

if len(tEnc) == 0 && len(cEnc) == 0 {
return p
}

if bytes.Equal(tEnc, []byte("chunked")) {
buf := bytes.NewBuffer(content)
if tEnc {
buf := bytes.NewReader(content)
r := httputil.NewChunkedReader(buf)
content, _ = ioutil.ReadAll(r)

Expand All @@ -50,16 +43,20 @@ func prettifyHTTP(p []byte) []byte {
headers = proto.SetHeader(headers, []byte("Content-Length"), []byte(newLen))
}

if bytes.Equal(cEnc, []byte("gzip")) {
buf := bytes.NewBuffer(content)
if cEnc {
buf := bytes.NewReader(content)
g, err := gzip.NewReader(buf)

if err != nil {
Debug(1, "[Prettifier] GZIP encoding error:", err)
return []byte{}
}

content, _ = ioutil.ReadAll(g)
content, err = ioutil.ReadAll(g)
if err != nil {
Debug(1, fmt.Sprintf("[HTTP-PRETTIFIER] %q", err))
return p
}

headers = proto.DeleteHeader(headers, []byte("Content-Encoding"))

Expand Down
5 changes: 2 additions & 3 deletions input_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func NewKafkaInputWithTLS(address string, config *InputKafkaConfig, tlsConfig *K
con = config.consumer
} else {
var err error
//con, err = sarama.NewConsumer([]string{config.Host}, c)
con, err = sarama.NewConsumer(strings.Split(config.Host, ","), c)

if err != nil {
Expand Down Expand Up @@ -97,9 +96,9 @@ func (i *KafkaInput) Read(data []byte) (int, error) {
return 0, err
}

copy(data, buf)
n := copy(data, buf)

return len(buf), nil
return n, nil

}

Expand Down
43 changes: 13 additions & 30 deletions input_raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,13 @@ func TestRAWInputIPv4(t *testing.T) {
}
plugins.All = append(plugins.All, input, output)

client := NewHTTPClient("127.0.0.1:"+port, &HTTPClientConfig{})

addr := "http://127.0.0.1:" + port
emitter := NewEmitter(quit)
defer emitter.Close()
go emitter.Start(plugins, Settings.Middleware)
for i := 0; i < 10; i++ {
wg.Add(2)
_, err = client.Get("/")
_, err = http.Get(addr)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -133,15 +132,15 @@ func TestRAWInputNoKeepAlive(t *testing.T) {
}
plugins.All = append(plugins.All, input, output)

client := NewHTTPClient("127.0.0.1:"+port, &HTTPClientConfig{})
addr := "http://127.0.0.1:" + port

emitter := NewEmitter(quit)
go emitter.Start(plugins, Settings.Middleware)

for i := 0; i < 10; i++ {
// request + response
wg.Add(2)
_, err = client.Get("/")
_, err = http.Get(addr)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -198,14 +197,13 @@ func TestRAWInputIPv6(t *testing.T) {
Outputs: []io.Writer{output},
}

client := NewHTTPClient(originAddr, &HTTPClientConfig{})

emitter := NewEmitter(quit)
addr := "http://" + originAddr
go emitter.Start(plugins, Settings.Middleware)
for i := 0; i < 10; i++ {
// request + response
wg.Add(2)
_, err = client.Get("/")
_, err = http.Get(addr)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -256,7 +254,7 @@ func TestInputRAWChunkedEncoding(t *testing.T) {
}))
defer replay.Close()

httpOutput := NewHTTPOutput(replay.URL, &HTTPOutputConfig{Debug: true})
httpOutput := NewHTTPOutput(replay.URL, &HTTPOutputConfig{})

plugins := &InOutPlugins{
Inputs: []io.Reader{input},
Expand All @@ -280,7 +278,7 @@ func TestInputRAWChunkedEncoding(t *testing.T) {
}

func BenchmarkRAWInputWithReplay(b *testing.B) {
var respCounter, reqCounter, replayCounter, capturedBody uint64
var respCounter, reqCounter, replayCounter uint64
wg := &sync.WaitGroup{}
wg.Add(b.N * 3) // reqCounter + replayCounter + respCounter

Expand All @@ -307,15 +305,7 @@ func BenchmarkRAWInputWithReplay(b *testing.B) {

replay := http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer wg.Done()
defer r.Body.Close()
w.Write([]byte("ab"))
atomic.AddUint64(&replayCounter, 1)
data, err := ioutil.ReadAll(r.Body)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
b.Log(err)
}
atomic.AddUint64(&capturedBody, uint64(len(data)))
wg.Done()
}),
}
go replay.Serve(listener0)
Expand All @@ -336,10 +326,9 @@ func BenchmarkRAWInputWithReplay(b *testing.B) {
} else {
atomic.AddUint64(&respCounter, 1)
}
atomic.AddUint64(&capturedBody, uint64(len(data)))
wg.Done()
})
httpOutput := NewHTTPOutput(replayAddr, &HTTPOutputConfig{Debug: false})
httpOutput := NewHTTPOutput(replayAddr, &HTTPOutputConfig{})

plugins := &InOutPlugins{
Inputs: []io.Reader{input},
Expand All @@ -349,22 +338,16 @@ func BenchmarkRAWInputWithReplay(b *testing.B) {
emitter := NewEmitter(quit)
go emitter.Start(plugins, Settings.Middleware)
now := time.Now()
var buf [1 << 20]byte
buf[1<<20-1] = 'a'
client := NewHTTPClient(originAddr, &HTTPClientConfig{ResponseBufferSize: 2 << 20, CompatibilityMode: true})
addr := "http://" + originAddr
for i := 0; i < b.N; i++ {
if i&1 == 0 {
_, err = client.Get("/")
} else {
_, err = client.Post("/", buf[:])
}
_, err = http.Get(addr)
if err != nil {
b.Log(err)
wg.Add(-3)
}
}

wg.Wait()
b.Logf("%d/%d Requests, %d/%d Responses, %d/%d Replayed, %d Bytes in %s\n", reqCounter, b.N, respCounter, b.N, replayCounter, b.N, capturedBody, time.Since(now))
b.Logf("%d/%d Requests, %d/%d Responses, %d/%d Replayed in %s\n", reqCounter, b.N, respCounter, b.N, replayCounter, b.N, time.Since(now))
emitter.Close()
}
12 changes: 6 additions & 6 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (
"github.com/buger/goreplay/proto"
)

// KafkaConfig should contains required information to
// InputKafkaConfig should contains required information to
// build producers.

type InputKafkaConfig struct {
producer sarama.AsyncProducer
consumer sarama.Consumer
Expand All @@ -23,6 +22,7 @@ type InputKafkaConfig struct {
UseJSON bool `json:"input-kafka-json-format"`
}

// OutputKafkaConfig is the representation of kfka output configuration
type OutputKafkaConfig struct {
producer sarama.AsyncProducer
consumer sarama.Consumer
Expand All @@ -34,8 +34,8 @@ type OutputKafkaConfig struct {
// KafkaTLSConfig should contains TLS certificates for connecting to secured Kafka clusters
type KafkaTLSConfig struct {
CACert string `json:"kafka-tls-ca-cert"`
clientCert string `json:"kafka-tls-client-cert"`
clientKey string `json:"kafka-tls-client-key"`
ClientCert string `json:"kafka-tls-client-cert"`
ClientKey string `json:"kafka-tls-client-key"`
}

// KafkaMessage should contains catched request information that should be
Expand Down Expand Up @@ -77,9 +77,9 @@ func NewTLSConfig(clientCertFile, clientKeyFile, caCertFile string) (*tls.Config
func NewKafkaConfig(tlsConfig *KafkaTLSConfig) *sarama.Config {
config := sarama.NewConfig()
// Configuration options go here
if (tlsConfig != nil) && (tlsConfig.CACert != "") && (tlsConfig.clientCert != "") && (tlsConfig.clientKey != "") {
if (tlsConfig != nil) && (tlsConfig.CACert != "") && (tlsConfig.ClientCert != "") && (tlsConfig.ClientKey != "") {
config.Net.TLS.Enable = true
tlsConfig, err := NewTLSConfig(tlsConfig.clientCert, tlsConfig.clientKey, tlsConfig.CACert)
tlsConfig, err := NewTLSConfig(tlsConfig.ClientCert, tlsConfig.ClientKey, tlsConfig.CACert)
if err != nil {
log.Fatal(err)
}
Expand Down
Loading