Browse Source

[ADD] support RocketMQ

master
yaolianzhou 1 year ago
parent
commit
799a89d3fe
  1. 2
      CHANGELOG.md
  2. 33
      build.gradle
  3. 4
      gradle.properties
  4. 189
      src/main/java/org/logstashplugins/RocketMQ.java
  5. 49
      src/test/java/org/logstashplugins/RocketmqOutputTest.java

2
CHANGELOG.md

@ -0,0 +1,2 @@
## 0.0.1
- Initial version for experimental v0 of native support for Java plugins.

33
build.gradle

@ -1,4 +1,5 @@
import java.nio.file.Files import java.nio.file.Files
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING import static java.nio.file.StandardCopyOption.REPLACE_EXISTING
apply plugin: 'java' apply plugin: 'java'
@ -7,17 +8,17 @@ apply from: LOGSTASH_CORE_PATH + "/../rubyUtils.gradle"
// =========================================================================== // ===========================================================================
// plugin info // plugin info
// =========================================================================== // ===========================================================================
group 'org.logstashplugins' // must match the package of the main plugin class group 'org.logstashplugins' // must match the package of the main plugin class
version "${file("VERSION").text.trim()}" // read from required VERSION file version "${file("VERSION").text.trim()}" // read from required VERSION file
description = "RocketMQ Java output implementation" description = "RocketMQ Java output implementation"
pluginInfo.licenses = ['Apache-2.0'] // list of SPDX license IDs pluginInfo.licenses = ['Apache-2.0'] // list of SPDX license IDs
pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
pluginInfo.authors = ['Joe.Yao'] pluginInfo.authors = ['Joe.Yao']
pluginInfo.email = ['ylz@flyrise.cn'] pluginInfo.email = ['ylz@flyrise.cn']
pluginInfo.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html" pluginInfo.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html"
pluginInfo.pluginType = "output" pluginInfo.pluginType = "output"
pluginInfo.pluginClass = "RocketMQ" pluginInfo.pluginClass = "RocketMQ"
pluginInfo.pluginName = "rocketmq" // must match the @LogstashPlugin annotation in the main plugin class pluginInfo.pluginName = "rocketmq" // must match the @LogstashPlugin annotation in the main plugin class
// =========================================================================== // ===========================================================================
sourceCompatibility = 1.8 sourceCompatibility = 1.8
@ -47,11 +48,17 @@ shadowJar {
dependencies { dependencies {
implementation 'org.apache.logging.log4j:log4j-api:2.17.0' implementation 'org.apache.logging.log4j:log4j-api:2.17.0'
implementation fileTree(dir: LOGSTASH_CORE_PATH, include: "**/logstash-core.jar") implementation 'org.apache.rocketmq:rocketmq-client:4.9.4'
implementation 'org.apache.rocketmq:rocketmq-acl:4.9.4'
implementation fileTree(dir: LOGSTASH_CORE_PATH, include: "**/logstash-core-7.17.2.jar")
testImplementation 'junit:junit:4.12' testImplementation 'junit:junit:4.12'
testImplementation 'org.jruby:jruby-complete:9.1.13.0' testImplementation 'org.jruby:jruby-complete:9.1.13.0'
testImplementation 'org.apache.logging.log4j:log4j-core:2.9.1' testImplementation 'org.apache.logging.log4j:log4j-core:2.9.1'
testImplementation 'com.fasterxml.jackson.core:jackson-databind:2.13.0'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.0'
} }
clean { clean {
@ -68,7 +75,7 @@ tasks.withType(JavaCompile) {
options.encoding = 'UTF-8' options.encoding = 'UTF-8'
} }
tasks.register("vendor"){ tasks.register("vendor") {
dependsOn shadowJar dependsOn shadowJar
doLast { doLast {
String vendorPathPrefix = "vendor/jar-dependencies" String vendorPathPrefix = "vendor/jar-dependencies"
@ -97,7 +104,7 @@ tasks.register("removeObsoleteJars") {
} }
} }
tasks.register("gem"){ tasks.register("gem") {
dependsOn = [downloadAndInstallJRuby, removeObsoleteJars, vendor, generateRubySupportFiles] dependsOn = [downloadAndInstallJRuby, removeObsoleteJars, vendor, generateRubySupportFiles]
doLast { doLast {
buildGem(projectDir, buildDir, pluginInfo.pluginFullName() + ".gemspec") buildGem(projectDir, buildDir, pluginInfo.pluginFullName() + ".gemspec")

4
gradle.properties

@ -1,2 +1,4 @@
org.gradle.jvmargs=-Xmx2g -Dfile.encoding=UTF-8 org.gradle.jvmargs=-Xmx2g -Dfile.encoding=UTF-8
org.gradle.daemon=false org.gradle.daemon=false
LOGSTASH_CORE_PATH=D:/Joe/projects/opensources/logstash/logstash-core

189
src/main/java/org/logstashplugins/RocketMQ.java

@ -0,0 +1,189 @@
package org.logstashplugins;
import co.elastic.logstash.api.*;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.logstash.plugins.codecs.Line;
import org.logstash.plugins.codecs.Plain;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
// class name must match plugin name
@LogstashPlugin(name = "rocketmq")
public class RocketMQ implements Output {
public static final PluginConfigSpec<String> PRODUCER_GROUP =
PluginConfigSpec.stringSetting("producer_group", "logstash_producer");
public static final PluginConfigSpec<String> NAMESRV_ADDR =
PluginConfigSpec.stringSetting("namesrv_addr", "", false, true);
public static final PluginConfigSpec<String> TOPIC =
PluginConfigSpec.stringSetting("topic", "", false, true);
public static final PluginConfigSpec<String> TAGS =
PluginConfigSpec.stringSetting("tags", "*");
public static final PluginConfigSpec<String> ACCESS_KEY =
PluginConfigSpec.stringSetting("access_key", "");
public static final PluginConfigSpec<String> SECRET_KEY =
PluginConfigSpec.stringSetting("secret_key", "");
public static final PluginConfigSpec<Boolean> ENABLE_MSG_TRACE =
PluginConfigSpec.booleanSetting("enable_msg_trace", false);
public static final PluginConfigSpec<Codec> CODEC_CONFIG =
PluginConfigSpec.codecSetting("codec", "java_line");
public static final PluginConfigSpec<String> CODEC_NAME =
PluginConfigSpec.stringSetting("codec_name", "json_lines");
private Logger logger;
private DefaultMQProducer producer;
private final String producerGroup;
private final String namesrvAddr;
private final String topic;
private final String tags;
private final String accessKey;
private final String secretKey;
private final boolean enableMsgTrace;
private Codec codec;
private final String id;
private final CountDownLatch done = new CountDownLatch(1);
private volatile boolean stopped = false;
// all plugins must provide a constructor that accepts id, Configuration, and Context
public RocketMQ(final String id, final Configuration config, final Context context) {
// constructors should validate configuration options
if (context != null) {
this.logger = context.getLogger(this);
}
this.id = id;
this.producerGroup = config.get(PRODUCER_GROUP);
this.namesrvAddr = config.get(NAMESRV_ADDR);
this.topic = config.get(TOPIC);
this.tags = config.get(TAGS);
this.accessKey = config.get(ACCESS_KEY);
this.secretKey = config.get(SECRET_KEY);
this.enableMsgTrace = config.get(ENABLE_MSG_TRACE);
this.codec = config.get(CODEC_CONFIG);
// if (codec == null) {
// throw new IllegalStateException("Unable to obtain codec");
// }
if (codec == null) {
String codecName = config.get(CODEC_NAME);
log(Level.ERROR, null, "Unable to obtain codec then create by name " + codecName);
if (codecName.contains("json") || codecName.contains("line")) {
this.codec = new Line(null, config, context);
} else {
this.codec = new Plain(null, config, context);
}
}
prepareProducer();
}
@Override
public void output(final Collection<Event> events) {
if (stopped) {
return;
}
for (Event event : events) {
// 获取Logstash事件中的消息内容
byte[] messageContent;
try (ByteArrayOutputStream bao = new ByteArrayOutputStream()) {
codec.encode(event, bao);
messageContent = bao.toByteArray();
} catch (IOException ex) {
throw new IllegalStateException(ex);
}
// 创建RocketMQ消息实例
Message message = new Message(topic, tags, messageContent);
message.setKeys(UUID.randomUUID().toString());
try {
// 发送消息到RocketMQ
SendResult sendResult = producer.send(message);
log(Level.INFO, null, "RocketMQ send result: ".concat(sendResult.toString()));
} catch (InterruptedException ex) {// Compliant; the interrupted state is restored
log(Level.WARN, ex, "Interrupted!");
/* Clean up whatever needs to be handled before interrupting */
Thread.currentThread().interrupt();
} catch (Exception ex) {
log(Level.ERROR, ex, ex.getMessage());
throw new IllegalStateException(ex);
}
}
}
private void prepareProducer() {
// 创建RocketMQ Producer实例
if (producer == null) {
producer = new DefaultMQProducer(producerGroup, getAclRpcHook(), enableMsgTrace, null);
producer.setNamesrvAddr(namesrvAddr);
// 消息刷盘(主或备)超时或 slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他 broker,默认 false。十分重要消息可以开启
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setRetryTimesWhenSendFailed(3);
// 使用故障延迟机制,会对获取的MQ进行可用性验证
producer.setSendLatencyFaultEnable(true);
producer.setSendMsgTimeout(3000);
//设置到broker的心跳
producer.setHeartbeatBrokerInterval(3000);
//从namesrv获取topic路由
producer.setPollNameServerInterval(3000);
try {
producer.start();
} catch (MQClientException e) {
log(Level.ERROR, e, e.getMessage());
throw new IllegalStateException("Unable to obtain producer");
}
}
}
private RPCHook getAclRpcHook() {
if (accessKey != null && secretKey != null && !accessKey.isEmpty() && !secretKey.isEmpty()) {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
return null;
}
private void log(Level level, Throwable t, String msg) {
if (logger != null) {
logger.log(level, msg, t);
}
}
@Override
public void stop() {
if (producer != null) {
producer.shutdown();
}
stopped = true;
done.countDown();
}
@Override
public void awaitStop() throws InterruptedException {
done.await();
}
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
// should return a list of all configuration options for this plugin
return PluginHelper.commonOutputSettings(Arrays.asList(PRODUCER_GROUP, NAMESRV_ADDR, TOPIC, TAGS, CODEC_CONFIG, CODEC_NAME));
}
@Override
public String getId() {
return id;
}
}

49
src/test/java/org/logstashplugins/RocketmqOutputTest.java

@ -0,0 +1,49 @@
package org.logstashplugins;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Event;
import org.junit.Test;
import org.logstash.Timestamp;
import org.logstash.plugins.ConfigurationImpl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class RocketmqOutputTest {
@Test
public void testRocketMQOutput() {
Map<String, Object> configValues = new HashMap<>();
configValues.put(RocketMQ.NAMESRV_ADDR.name(), "10.62.16.226:31409");
// configValues.put(RocketMQ.PRODUCER_GROUP.name(), "");
configValues.put(RocketMQ.TOPIC.name(), "logstash-output-rocketmq");
// configValues.put(RocketMQ.TAG.name(), "");
// configValues.put(RocketMQ.CODEC_NAME.name(), "json_lines");
Configuration config = new ConfigurationImpl(configValues);
RocketMQ rocketmq = new RocketMQ("test-id", config, null);
int eventCount = 5;
Collection<Event> events = new ArrayList<>();
for (int k = 0; k < eventCount; k++) {
events.add(getEvent(k));
}
rocketmq.output(events);
}
private static Event getEvent(int index) {
Event event = new org.logstash.Event();
event.setField("show", "true");
event.setField("tableName", "cons_staff");
event.setField("title", "鸿鹄" + index);
event.setField("comIds", "1730762885845381120");
event.setField("buzType", "通讯录");
event.setField("url", "cons_staff&1730762885996376064");
event.setField("fromTypes", "WEB,APP");
event.setField("createTime", Timestamp.now());
event.setField("updateTime", Timestamp.now());
event.setField("publicTime", Timestamp.now());
return event;
}
}
Loading…
Cancel
Save