文章目录
  1. 1. nodejs 连接kafka踩坑记
    1. 1.1. 背景
    2. 1.2. 收集容器日志输出到kafka
    3. 1.3. 写代码方案-选组件
      1. 1.3.1. kafkajs
      2. 1.3.2. kafka-node
      3. 1.3.3. node-rdkafka

nodejs 连接kafka踩坑记

背景

由于所在的架构团队中很多后台应用使用的nodejs写的代码,现在要把nodejs的日志全部记入到公司的kafka中,运维团队只分配给我无一个topic,然后让我们通过结构逻辑去区分。

需求:

  1. 因我的应用全是容器部署,故希望通过不改代码,将容器控制台日志直接输出到kafka
  2. 如果不改代码不可行,则通过改代码的方式解决

kafka-server:

版本号:3.2.0

加密协议:SASL_PLAINTEXT

加密算法:SCRAM-SHA-256

收集容器日志输出到kafka

使用阿里开源的log-ploit来收集容器日志,然后输出到kafka

docker-compose:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
version: '3.2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
pilot:
image: hz-log-pilot:1.0.3
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /:/host
privileged: true
environment:
- LOGGING_OUTPUT=kafka
- KAFKA_BROKERS=test-kafka-idc-1.xxx.com:9092
- KAFKA_VERSION=0.11.0
- KAFKA_USERNAME=admin
- KAFKA_PASSWORD=xxx
- KAFKA_MECHANISM=SCRAM-SHA-256
- KAFKA_SSL=true
- KAFKA_PROTOCOL=PLAIN
labels:
aliyun.global: true

hello-node:
image: hello-node:1.0
ports:

- "9003:8888"
vironment:
- aliyun_logs_topic-devops=stdout
aliyun_logs_topic-devops_format=json
- aliyun_logs_topic-devops_tags=appId=backend,appName=test-app

经过测试发现这个在我厂的kafka服务端的配置中,是无法将日志发送过去的,但在我自己搭建的kafka服务器上可以正常输出到kafka。

后面经过一通查找,发现ploit这个组件底层使用的filebeat是不支持SCRAM-SHA-256的加密的,气死。。。。。

写代码方案-选组件

必应了一下,关键字: nodejs kafka

发现很多人都推荐使用kafkajs来对接nodejs。

https://kafka.js.org/docs/getting-started

https://github.com/tulios/kafkajs

然后写了个test:

kafkajs

kafka.test-spec.ts:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
const { Kafka } = require('kafkajs')

const kafka = new Kafka({
// clientId: 'devops',
logLevel: 5,
brokers: ['10.105.141.164:49171'],
authenticationTimeout: 1000,
reauthenticationThreshold: 10000,
sasl: {
mechanism: "SCRAM-SHA-256",
username: "admin",
password: "xxxx"
}
});

const topic = 'test1'
const producer = kafka.producer()


const sendMessage = async () => {
await producer.connect()
return producer
.send({
topic,
compression: Kafka.CompressionTypes.None,
messages: [
{topic:"test",messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
],
})
.then(console.log)
.catch(e => console.error(`[example/producer] ${e.message}`, e))
}



describe('kafka tests', function() {
describe('send Tests', function() {
this.timeout(15000);
it('should connection success', async function (done) {
await sendMessage();
done();

},4000);

});
});

webstorm(集成nodejs单元测试)运行一下,发现无法连接:

image-20210811175832351

原因是,因为kafka-server端使用的加密协议是:SASL_PLAINTEXT,但kafkajs只支持plain协议,所有无法连接。

再加上kafka-server端使用的版本是:2.3.0,虽然kafkajs说支持kafka的版本是0.11+,有可能2.x还不支持。

后面又看到说kafka-node也挻火,所以又写了个test。

kafka-node

https://www.npmjs.com/package/kafka-node/v/5.0.0#producer

Kafka-node is a Node.js client for Apache Kafka 0.9 and later.

https://github.com/SOHU-Co/kafka-node

是soho的库,说是支持0.9之后的版本,上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
var assert = require('assert');

var expect = require('chai').expect;

var kafka = require('kafka-node'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
client = new kafka.KafkaClient({kafkaHost: 'test-kafka-idc-1.xxx.com:9092', connectTimeout: 3000, sasl:{mechanism: 'scram-sha-256', username: 'admin', password: 'xxx'} }),
producer = new Producer(client),
km = new KeyedMessage('key', 'message');



var producerOption = {
requireAcks: 1,
ackTimeoutMs: 100,
partitionerType: 0 //默认为第一个分区
};
producer = new Producer(client,producerOption);


function getPayloads(){
return [
{topic:"test",messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
];
}

describe('kafka tests', function() {
describe('send Tests', function() {
this.timeout(15000);
it('should connection success', function (done) {
producer.on('ready', async function () {
producer.send(getPayloads(), function (err, data) {
if (err) {
console.log('[kafka-producer -> test1]: broker update failed');
} else {
console.log('[kafka-producer -> test1]: broker update success');
}
done();
});
});
producer.on('error', function (err) {
console.log('error:'+err.toString());
done();
})

},6000);

});
});


// 函数实现,参数单位 秒 ;
function wait(second) {
// execSync 属于同步方法;异步方式请根据需要自行查询 node.js 的 child_process 相关方法;
let ChildProcess_ExecSync = require('child_process').execSync;
ChildProcess_ExecSync('sleep ' + second);
};

还是无法连接,报连接超时:

image-20210811183231115

在他github上发现,已经两年没更新了,看来是不维护了,估计还是不支持那个协议,或者是客户端版本不匹配问题。

还是上kafka官网看看吧,在官网上看到kafka client端支持:

https://cwiki.apache.org/confluence/display/KAFKA/Clients

发现了nodejs的支持的库,官方推荐排在第一的是:node-rdkafka

image-20210811183700339

虽然kafka-node排第二,但是貌似不支持我厂kafka服务端的协议配置。

node-rdkafka

https://github.com/Blizzard/node-rdkafka

同样写个test, rd-kafka.test-spec.ts:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
var assert = require('assert');

var expect = require('chai').expect;
const Kafka = require('node-rdkafka')
const ERR_TOPIC_ALREADY_EXISTS = 36;
const config = {
'bootstrap.servers': 'test-kafka-idc-1.xxx.com:9092',
'sasl.username': 'admin',
'sasl.password': 'xxx',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanisms': 'SCRAM-SHA-256'
};

const topic = 'topic-devops';

function getPayloads(){
return [
{topic:topic,messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
];
}

describe('kafka tests', function() {
describe('send Tests', function() {
this.timeout(15000);
it('should connection success', async function (done) {
await produceExample();
},6000);

});
});

function ensureTopicExists() {
const adminClient = Kafka.AdminClient.create(config);

return new Promise((resolve, reject) => {
adminClient.createTopic({
topic: topic,
num_partitions: 1,
replication_factor: 3
}, (err) => {
if (!err) {
console.log(`Created topic ${config.topic}`);
return resolve();
}

if (err.code === ERR_TOPIC_ALREADY_EXISTS) {
return resolve();
}

return reject(err);
});
});
}

async function produceExample() {


// await ensureTopicExists();

const producer = await createProducer((err, report) => {
if (err) {
console.warn('Error producing', err)
} else {
const {topic, partition, value} = report;
console.log(`Successfully produced record to topic "${topic}" partition ${partition} ${value}`);
}
});

for (let idx = 0; idx < 10; ++idx) {
const key = 'alice';
const value = Buffer.from(JSON.stringify({ name:key+idx,count: idx }));

console.log(`Producing record ${key}\t${value}`);

producer.produce(topic, -1, value, key);
}

producer.flush(10000, () => {
producer.disconnect();
});
}

produceExample()
.catch((err) => {
console.error(`Something went wrong:\n${err}`);
process.exit(1);
});


function createProducer(onDeliveryReport) {
let proConfig = config;
proConfig['dr_msg_cb']=true;
const producer = new Kafka.Producer(proConfig);

return new Promise((resolve, reject) => {
producer
.on('ready', () => resolve(producer))
.on('delivery-report', onDeliveryReport)
.on('event.error', (err) => {
console.warn('event.error', err);
reject(err);
});
producer.connect();
});
}

webstorm执行一下:

image-20210811184052421

发现已经发送成功,到后台查下,发现已经发过去了,太棒了。。。。。。

这个问题之前已经困扰了我差不多一周时间了,特此记录下来,方便后面的同学参考。。。。。。

https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/nodejs.html

https://blizzard.github.io/node-rdkafka/current/

文章目录
  1. 1. nodejs 连接kafka踩坑记
    1. 1.1. 背景
    2. 1.2. 收集容器日志输出到kafka
    3. 1.3. 写代码方案-选组件
      1. 1.3.1. kafkajs
      2. 1.3.2. kafka-node
      3. 1.3.3. node-rdkafka