Thursday, 1 June 2017

kafka-node on ready event is not getting triggered

I have created singleton class for Kafka client and producer to create only one object. I need to publish the same topics multiple times without creating new Client and producer instance. I found producer.on('ready',fn(){}) is not getting triggered using same client and producer instance ,only first time it gets triggered when i am having new client and producer object.

Here the sample code:

Singleton class:

const kafka = require('kafka-node');
const logger = require('./../../../../applogger');
const kafkaConfig = require('./../../../../config/config');

function ConnectionProvider() {
    let kafkaConnection = undefined;
    let client = undefined;

    this.getConnection = () => {

        if (!this.kafkaConnection) {
            logger.info("Creating new kafka connection ------------------------------------- ");
            this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST);
            this.kafkaConnection = new kafka.Producer(this.client);
        }
        return this.kafkaConnection;
    };
    this.getClient = () => {
        if (!this.client) {
            logger.info("Creating new kafka Client ------------------------------------- ");
            this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST);
        }
        return this.client;

    }
    process.on('SIGINT', function() {
        logger.info("Going to terminate kafka connection...!");
        process.exit(0);
    });
}
module.exports = exports = new ConnectionProvider;

Topic publish Method:

const kafkaClient = require('./../core/kafkaConnection');

    const publishToKafka = function(dataPayload, callback) {
        logger.debug('Publishing to topic ', topicName, ' with data: ', dataPayload);
        let producer = kafkaClient.getConnection();

        producer.on('ready', function() {
            let payloads = dataPayload;
            producer.send(payloads, function(err, data) {
                if (err) {
                    logger.error(
                        'Error in publishing message to messaging pipeline ', err
                    );
                    callback(err, null);
                    return;
                }

                logger.debug('Published message to messaging pipeline topic ', topicName, ' with result: ', data);

                callback(null, data);
                return;
            });
        });

        producer.on('error', function(err) {
            logger.error(
                'Error in publishing message to messaging pipeline ', err
            );
            producer.close();
        });

    };

DataPayload is : let dataPayload=[{topic:someTopic,message:someMessage}]

I need to call PublishToKafka method multiple times but want to create only one kafka client and producer instance. But producer is not publishing Topics because producer.on('ready',function(){}) not getting triggered while using the same object of client and producer.

Thanks in advance.



via Pankush

No comments:

Post a Comment