lua-resty-kafka-fast

Name

lua-resty-kafka-fast - Lua Kafka client driver for OpenResty.

Table of Contents

Status

This library is considered production-ready.

Back to TOC

Description

This Lua library is a Kafka client driver for working with the ngx_lua nginx module:

https://github.com/openresty/lua-nginx-module/#readme

Back to TOC

Synopsis

load_module "/usr/local/openresty/nginx/modules/ngx_http_lua_kafka_module.so";

thread_pool lua_kafka threads=16;

http {
    lua_kafka_max_clients 16;

    # you do not need the following line if you are using
    # the OpenResty bundle:
    lua_package_path "/path/to/lua-resty-kafka-fast/lib/?.lua;;";

    init_worker_by_lua_block {
        local is_kafka_client_running = false

        local function kafka_consumer_imp()
            local kafka = require "resty.kafka.fast"
            local topics = {{topic = "topic1"}, {topic = "topic2"}}
            local client, err = kafka.new_consumer("127.0.0.1:9094",
                                    {["auto.offset.reset"] = "beginning",
                                    ["broker.address.family"]="v4"},
                                    "group-name", topics)
            if not client then
                ngx.say("create kafka client failed: ", err)
                return
            else
                ngx.say("create kafka client success")
            end

            -- consume 100 message
            for i = 1, 100 do
                if ngx.worker.exiting() then
                    break
                end

                local msg, err = client:read()
                if not msg then
                    ngx.log(ngx.ERR, "recv err: ", err)
                else
                    ngx.log(ngx.ERR, "topic: ", msg.topic or "",
                            ", key: ", msg.key or "",
                            ", payload: ", msg.payload or  "",
                            ", offset: ", msg.offset or "",
                            ", partition: ", msg.partition or "")
                end
            end

            client:close()
        end 

        local function kafka_consumer(premature)
            if premature or is_kafka_client_running then
                return
            end
            
            is_kafka_client_running = true

            local ok, err = pcall(kafka_consumer_imp)
            if not ok then
                ngx.log(ngx.ERR, err)
            end

            is_kafka_client_running = false
        end

        local ok, err = ngx.timer.every(10, kafka_consumer)
        if not ok then
            ngx.say("failed to set timer for kafka_consumer: ", err)
            return
        end 
    }
}

Back to TOC

Directives

lua_kafka_max_clients

syntax: lua_kafka_max_clients [num]

default: lua_kafka_max_clients 128

context: http

Specifies the maximum number of entries allowed in the worker process level for the kafka client.

Each client consumes one thread, so the number of clients should be the same as the value configured for the thread pool lua_kafka.

Back to TOC

Methods

new_consumer

syntax: consumer, err = kafka.new_consumer(brokers, config, group_id, topics)

Creates a Kafka consumer object. In case of failures, returns nil and a string describing the error.

The brokers argument is a comma-separated list of host or host:port (default port 9092). lua-resty-kafka-fast will use the bootstrap brokers to acquire the full set of brokers from the cluster.

The config argument is a Lua table holding the following keys:

  • auto.offset.reset

    Action to take when there is no initial offset in offset store or the desired offset is out of range: ‘smallest’,’earliest’ - automatically reset the offset to the smallest offset, ’largest’,’latest’ - automatically reset the offset to the largest offset, ’error’ - trigger an error which is retrieved by consuming messages and checking err.

    The values: smallest, earliest, beginning, largest, latest, end, error.

    Default values: largest

  • broker.address.family

    Allowed broker IP address families: any, v4, v6

    The values: any v4, v6.

    Default value: any.

  • enable.partition.eof

    Emit RD_KAFKA_RESP_ERR__PARTITION_EOF event whenever the consumer reaches the end of a partition.

    The values: true, false

    Default value: true

  • security.protocol

    Protocol used to communicate with brokers.

    The values: plaintext, ssl Default value: plaintext

  • enable.ssl.certificate.verification

    Enable OpenSSL’s builtin broker (server) certificate verification. The values can be true or false. The default value is true.

  • ssl.key.location

    Path to client’s private key (PEM) used for authentication.

  • ssl.key.pem

    Client’s private key string (PEM format) used for authentication.

  • ssl.key.password

    Private key passphrase

  • ssl.certificate.location

    Path to client’s public key (PEM) used for authentication.

  • ssl.certificate.pem

    Client’s public key string (PEM format) used for authentication.

  • ssl.ca.location

    File or directory path to CA certificate(s) for verifying the broker’s key.

  • ssl.ca.pem

    CA certificate string (PEM format) for verifying the broker’s key.

  • ssl.cipher.suites

    A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. See manual page for ciphers(1) and `SSL_CTX_set_cipher_list(3).

  • ssl.crl.location

    Path to CRL for verifying broker’s certificate validity.

The group_id argument is a Lua string holding the group ID of the consumer. This argument can be nil.

The topics argument is a Lua table holding the following one or more topics. Each topic contains the following fileds:

  • topic

    The topic to be consumed. This field must be specified.

  • partition

    The number of the partition to be consumed. This field is optional.

  • offset

    The start offset of the topic to be consumed. This field is optional.

    The values: kafka.OFFSET_BEGINNING, kafka.OFFSET_END, kafka.OFFSET_STORED

For example:

{ topic = "topic-name", partition = 0, offset = 1000 }

or

{ topic = "topic-name" }

Back to TOC

read

syntax: msg, err = consumer:read()

Read a message from the Kafka server. In case of failures, returns nil and a string describing the error.

Consumer errors are generally to be considered informational as the consumer library will automatically try to recover from all types of errors. To prevent Lua code from being blocked for long periods of time, a “read timeout” error message is returned if no message is received from the Kafka server for more than 1s.

Here are some error messages you might be interested in:

  • “read timeout”
  • “Subscribed topic not available: topic-ts1-tc23: Broker: Unknown topic or partition”
  • “Fetch from broker 101 reached end of partition at offset 4 (HighwaterMark 4)”

Back to TOC

describe_group

syntax: consumer, err = kafka.describe_group(brokers, config, group_id, topics)

Fetch the offset information of the topic owned by the group.

The brokers, config, group_id arguments are just like the arguments in kafka.new_consumer.

The optional topics argument is a Lua table. each element in topics contains two fields: topic and partition.

For example:

local msg, err = kafka.describe_group(
    "127.0.0.1:9094",
    {["broker.address.family"]="v4"},
    "group-name")
local msg, err = kafka.describe_group(
    "127.0.0.1:9094",
    {["broker.address.family"]="v4"},
    "group-name",
    {{topic = "topic1", partition = 0}})

Back to TOC

Copyright & Licenses

Copyright (C) 2024 OpenResty Inc. All rights reserved.

This software is proprietary and must not be redistributed or shared at all.

Back to TOC