|
| 1 | +require 'fluent/output' |
| 2 | + |
| 3 | +module Fluent |
| 4 | + class LogtailOutput < Fluent::BufferedOutput |
| 5 | + Fluent::Plugin.register_output('logtail', self) |
| 6 | + |
| 7 | + VERSION = "0.1.0".freeze |
| 8 | + CONTENT_TYPE = "application/msgpack".freeze |
| 9 | + HOST = "https://in.logtail.com".freeze |
| 10 | + PATH = "/".freeze |
| 11 | + MAX_ATTEMPTS = 3.freeze |
| 12 | + RETRYABLE_CODES = [429, 500, 502, 503, 504].freeze |
| 13 | + USER_AGENT = "Logtail Logstash/#{VERSION}".freeze |
| 14 | + |
| 15 | + config_param :source_token, :string, secret: true |
| 16 | + config_param :ip, :string, default: nil |
| 17 | + |
| 18 | + def configure(conf) |
| 19 | + source_token = conf["source_token"] |
| 20 | + @headers = { |
| 21 | + "Authorization" => "Bearer #{source_token}", |
| 22 | + "Content-Type" => CONTENT_TYPE, |
| 23 | + "User-Agent" => USER_AGENT |
| 24 | + } |
| 25 | + super |
| 26 | + end |
| 27 | + |
| 28 | + def start |
| 29 | + super |
| 30 | + require 'http' |
| 31 | + HTTP.default_options = {:keep_alive_timeout => 29} |
| 32 | + @http_client = HTTP.persistent(HOST) |
| 33 | + end |
| 34 | + |
| 35 | + def shutdown |
| 36 | + @http_client.close if @http_client |
| 37 | + super |
| 38 | + end |
| 39 | + |
| 40 | + def format(tag, time, record) |
| 41 | + record.merge("dt" => Time.at(time).utc.iso8601).to_msgpack |
| 42 | + end |
| 43 | + |
| 44 | + def write(chunk) |
| 45 | + deliver(chunk, 1) |
| 46 | + end |
| 47 | + |
| 48 | + private |
| 49 | + def deliver(chunk, attempt) |
| 50 | + if attempt > MAX_ATTEMPTS |
| 51 | + log.error("msg=\"Max attempts exceeded dropping chunk\" attempt=#{attempt}") |
| 52 | + return false |
| 53 | + end |
| 54 | + |
| 55 | + body = chunk.read |
| 56 | + response = @http_client.headers(@headers).post(PATH, body: body) |
| 57 | + response.flush |
| 58 | + code = response.code |
| 59 | + |
| 60 | + if code >= 200 && code <= 299 |
| 61 | + true |
| 62 | + elsif RETRYABLE_CODES.include?(code) |
| 63 | + sleep_time = sleep_for_attempt(attempt) |
| 64 | + log.warn("msg=\"Retryable response from the Logtail API\" " + |
| 65 | + "code=#{code} attempt=#{attempt} sleep=#{sleep_time}") |
| 66 | + sleep(sleep_time) |
| 67 | + deliver(chunk, attempt + 1) |
| 68 | + else |
| 69 | + log.error("msg=\"Fatal response from the Logtail API\" code=#{code} attempt=#{attempt}") |
| 70 | + false |
| 71 | + end |
| 72 | + end |
| 73 | + |
| 74 | + def sleep_for_attempt(attempt) |
| 75 | + sleep_for = attempt ** 2 |
| 76 | + sleep_for = sleep_for <= 60 ? sleep_for : 60 |
| 77 | + (sleep_for / 2) + (rand(0..sleep_for) / 2) |
| 78 | + end |
| 79 | + end |
| 80 | +end |
0 commit comments