NAME
    Kafka::Producer::Avro - Avro message producer for Apache Kafka.

SYNOPSIS
        use Kafka::Connection;
        use Kafka::Producer::Avro;

        my $connection = Kafka::Connection->new( host => 'localhost' );

        my $producer = Kafka::Producer::Avro->new( Connection => $connection );

        # Do some interactions with Avro & SchemaRegistry before sending messages

        # Sending a single message
        my $response = $producer->send(...);

        # Sending a series of messages
        $response = $producer->send(...);

        # Closes the producer and cleans up
        undef $producer;
        $connection->close;
        undef $connection;

DESCRIPTION
    "Kafka::Producer::Avro" main feature is to provide object-oriented API
    to produce messages according to *Confluent SchemaRegistry* and *Avro*
    serialization.

    "Kafka::Producer::Avro" inerhits from and extends Kafka::Producer.

  CONSTRUCTOR
   "new"
    Creates new producer client object.

    "new()" takes arguments in key-value pairs as described in
    Kafka::Producer from which it inherits.

    In addition, takes in the following arguments:

    "SchemaRegistry => $schema_registry" (mandatory)
       Is a Confluent::SchemaRegistry instance.

  METHODS
    The following methods are defined for the "Kafka::Avro::Producer" class:

   "schema_registry"()
    Returns the Confluent::SchemaRegistry instance supplied to the
    construcor.

   "get_error"()
    Returns a string containing last error message.

   "send( %params )"
    Sends a messages on a Kafka::Connection object.

    Returns a non-blank value (a reference to a hash with server response
    description) if the message is successfully sent.

    Despite Kafka::Producer method that expects positional arguments,
    "Kafka::Producer::Avro-"send()> method looks for named parameters:

      $producer->send(
            topic             => $topic,             # scalar 
            partition         => $partition,         # scalar
            messages          => $messages,          # scalar | array
            keys              => $keys,              # scalar | array
            compression_codec => $compression_codec, # scalar
            key_schema        => $key_schema,        # optional JSON-string
            value_schema      => $value_schema       # optional JSON-string
      );

    Extra arguments may be suggested:

    "key_schema => $key_schema" and "value_schema => $value_schema"
       Both $key_schema and $value_schema parameters are optional and
       provide JSON strings that represent Avro schemas to use to validate
       and serialize key(s) and value(s).

       These schemas are validated against "schema_registry" and, if
       compliant, they are added to the registry under the "$topic+'key'" or
       "$topic+'value'" subjects.

       If an expected schema isn't provided, latest version from Schema
       Registry is used accordingly to the subject (key or value).

   "bulk_send( %params )"
    Similar to "send" but uses bulks to avoid memory leaking.

    Extra named parameters are expected:

    "size => $size"
       The size of the bulk

    "on_before_send_bulk => sub {...} " (optional)
       A code block that will be executed before the sending of each bulk.

       The block will receive the following positional parameters:

       $bulk_num the number of the bulk
       $bulk_messages the number of messages in the bulk
       $bulk_keys the number of keys in the bulk
       $index_from the absolute index of the first message in the bulk
       $index_to the absolute index of the last message in the bulk

    "on_after_send_bulk => sub {...} " (optional)
       A code block that will be executed after the sending of each bulk.

       The block will receive the following positional parameters:

       $sent the number of sent messages in the bulk
       $total_sent the total number of messages sent

    "on_init => sub {...} " (optional)
       A code block that will be executed only once before at the beginning
       of the cycle.

       The block will receive the following positional parameters:

       $to_send the total number of messages to send
       $bulk_size the size of the bulk

    "on_complete => sub {...} " (optional)
       A code block that will be executed only once after the end of the
       cycle.

       The block will receive the following positional parameters:

       $to_send the total number of messages to send
       $total_sent the total number of messages sent
       $errors the number bulks sent with errors

    "on_send_error => sub {...} " (optional)
       A code block that will be executed when a bulk registers an error.

AUTHOR
    Alvaro Livraghi, <alvarol@cpan.org>

CONTRIBUTE
    <https://github.com/alivraghi/Kafka-Producer-Avro>

BUGS
    Please use GitHub project link above to report problems or contact
    authors.

COPYRIGHT AND LICENSE
    Copyright 2018 by Alvaro Livraghi

    This program is free software; you can redistribute it and/or modify it
    under the same terms as Perl itself.