diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..e82874a --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,2 @@ +## 0.0.1 +- Initial version for experimental v0 of native support for Java plugins. diff --git a/build.gradle b/build.gradle index 99a843a..f289726 100644 --- a/build.gradle +++ b/build.gradle @@ -1,4 +1,5 @@ import java.nio.file.Files + import static java.nio.file.StandardCopyOption.REPLACE_EXISTING apply plugin: 'java' @@ -7,17 +8,17 @@ apply from: LOGSTASH_CORE_PATH + "/../rubyUtils.gradle" // =========================================================================== // plugin info // =========================================================================== -group 'org.logstashplugins' // must match the package of the main plugin class -version "${file("VERSION").text.trim()}" // read from required VERSION file -description = "RocketMQ Java output implementation" -pluginInfo.licenses = ['Apache-2.0'] // list of SPDX license IDs +group 'org.logstashplugins' // must match the package of the main plugin class +version "${file("VERSION").text.trim()}" // read from required VERSION file +description = "RocketMQ Java output implementation" +pluginInfo.licenses = ['Apache-2.0'] // list of SPDX license IDs pluginInfo.longDescription = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using \$LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" -pluginInfo.authors = ['Joe.Yao'] -pluginInfo.email = ['ylz@flyrise.cn'] -pluginInfo.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html" -pluginInfo.pluginType = "output" -pluginInfo.pluginClass = "RocketMQ" -pluginInfo.pluginName = "rocketmq" // must match the @LogstashPlugin annotation in the main plugin class +pluginInfo.authors = ['Joe.Yao'] +pluginInfo.email = ['ylz@flyrise.cn'] +pluginInfo.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html" +pluginInfo.pluginType = "output" +pluginInfo.pluginClass = "RocketMQ" +pluginInfo.pluginName = "rocketmq" // must match the @LogstashPlugin annotation in the main plugin class // =========================================================================== sourceCompatibility = 1.8 @@ -47,11 +48,17 @@ shadowJar { dependencies { implementation 'org.apache.logging.log4j:log4j-api:2.17.0' - implementation fileTree(dir: LOGSTASH_CORE_PATH, include: "**/logstash-core.jar") + implementation 'org.apache.rocketmq:rocketmq-client:4.9.4' + implementation 'org.apache.rocketmq:rocketmq-acl:4.9.4' + + implementation fileTree(dir: LOGSTASH_CORE_PATH, include: "**/logstash-core-7.17.2.jar") testImplementation 'junit:junit:4.12' testImplementation 'org.jruby:jruby-complete:9.1.13.0' testImplementation 'org.apache.logging.log4j:log4j-core:2.9.1' + + testImplementation 'com.fasterxml.jackson.core:jackson-databind:2.13.0' + testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.13.0' } clean { @@ -68,7 +75,7 @@ tasks.withType(JavaCompile) { options.encoding = 'UTF-8' } -tasks.register("vendor"){ +tasks.register("vendor") { dependsOn shadowJar doLast { String vendorPathPrefix = "vendor/jar-dependencies" @@ -97,7 +104,7 @@ tasks.register("removeObsoleteJars") { } } -tasks.register("gem"){ +tasks.register("gem") { dependsOn = [downloadAndInstallJRuby, removeObsoleteJars, vendor, generateRubySupportFiles] doLast { buildGem(projectDir, buildDir, pluginInfo.pluginFullName() + ".gemspec") diff --git a/gradle.properties b/gradle.properties index ece9ab0..580337e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,4 @@ org.gradle.jvmargs=-Xmx2g -Dfile.encoding=UTF-8 -org.gradle.daemon=false \ No newline at end of file +org.gradle.daemon=false + +LOGSTASH_CORE_PATH=D:/Joe/projects/opensources/logstash/logstash-core \ No newline at end of file diff --git a/src/main/java/org/logstashplugins/RocketMQ.java b/src/main/java/org/logstashplugins/RocketMQ.java new file mode 100644 index 0000000..485c7f1 --- /dev/null +++ b/src/main/java/org/logstashplugins/RocketMQ.java @@ -0,0 +1,189 @@ +package org.logstashplugins; + +import co.elastic.logstash.api.*; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.Logger; +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.RPCHook; +import org.logstash.plugins.codecs.Line; +import org.logstash.plugins.codecs.Plain; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + + +// class name must match plugin name +@LogstashPlugin(name = "rocketmq") +public class RocketMQ implements Output { + public static final PluginConfigSpec PRODUCER_GROUP = + PluginConfigSpec.stringSetting("producer_group", "logstash_producer"); + public static final PluginConfigSpec NAMESRV_ADDR = + PluginConfigSpec.stringSetting("namesrv_addr", "", false, true); + public static final PluginConfigSpec TOPIC = + PluginConfigSpec.stringSetting("topic", "", false, true); + public static final PluginConfigSpec TAGS = + PluginConfigSpec.stringSetting("tags", "*"); + public static final PluginConfigSpec ACCESS_KEY = + PluginConfigSpec.stringSetting("access_key", ""); + public static final PluginConfigSpec SECRET_KEY = + PluginConfigSpec.stringSetting("secret_key", ""); + public static final PluginConfigSpec ENABLE_MSG_TRACE = + PluginConfigSpec.booleanSetting("enable_msg_trace", false); + public static final PluginConfigSpec CODEC_CONFIG = + PluginConfigSpec.codecSetting("codec", "java_line"); + public static final PluginConfigSpec CODEC_NAME = + PluginConfigSpec.stringSetting("codec_name", "json_lines"); + + private Logger logger; + private DefaultMQProducer producer; + private final String producerGroup; + private final String namesrvAddr; + private final String topic; + private final String tags; + private final String accessKey; + private final String secretKey; + private final boolean enableMsgTrace; + private Codec codec; + + private final String id; + private final CountDownLatch done = new CountDownLatch(1); + private volatile boolean stopped = false; + + + // all plugins must provide a constructor that accepts id, Configuration, and Context + public RocketMQ(final String id, final Configuration config, final Context context) { + // constructors should validate configuration options + if (context != null) { + this.logger = context.getLogger(this); + } + this.id = id; + this.producerGroup = config.get(PRODUCER_GROUP); + this.namesrvAddr = config.get(NAMESRV_ADDR); + this.topic = config.get(TOPIC); + this.tags = config.get(TAGS); + this.accessKey = config.get(ACCESS_KEY); + this.secretKey = config.get(SECRET_KEY); + this.enableMsgTrace = config.get(ENABLE_MSG_TRACE); + this.codec = config.get(CODEC_CONFIG); +// if (codec == null) { +// throw new IllegalStateException("Unable to obtain codec"); +// } + if (codec == null) { + String codecName = config.get(CODEC_NAME); + log(Level.ERROR, null, "Unable to obtain codec then create by name " + codecName); + if (codecName.contains("json") || codecName.contains("line")) { + this.codec = new Line(null, config, context); + } else { + this.codec = new Plain(null, config, context); + } + } + + prepareProducer(); + } + + @Override + public void output(final Collection events) { + if (stopped) { + return; + } + for (Event event : events) { + // 获取Logstash事件中的消息内容 + byte[] messageContent; + try (ByteArrayOutputStream bao = new ByteArrayOutputStream()) { + codec.encode(event, bao); + messageContent = bao.toByteArray(); + } catch (IOException ex) { + throw new IllegalStateException(ex); + } + // 创建RocketMQ消息实例 + Message message = new Message(topic, tags, messageContent); + message.setKeys(UUID.randomUUID().toString()); + try { + // 发送消息到RocketMQ + SendResult sendResult = producer.send(message); + log(Level.INFO, null, "RocketMQ send result: ".concat(sendResult.toString())); + } catch (InterruptedException ex) {// Compliant; the interrupted state is restored + log(Level.WARN, ex, "Interrupted!"); + /* Clean up whatever needs to be handled before interrupting */ + Thread.currentThread().interrupt(); + } catch (Exception ex) { + log(Level.ERROR, ex, ex.getMessage()); + throw new IllegalStateException(ex); + } + } + } + + private void prepareProducer() { + // 创建RocketMQ Producer实例 + if (producer == null) { + producer = new DefaultMQProducer(producerGroup, getAclRpcHook(), enableMsgTrace, null); + producer.setNamesrvAddr(namesrvAddr); + + // 消息刷盘(主或备)超时或 slave 不可用(返回状态非 SEND_OK),是否尝试发送到其他 broker,默认 false。十分重要消息可以开启 + producer.setRetryAnotherBrokerWhenNotStoreOK(true); + producer.setRetryTimesWhenSendAsyncFailed(3); + producer.setRetryTimesWhenSendFailed(3); + // 使用故障延迟机制,会对获取的MQ进行可用性验证 + producer.setSendLatencyFaultEnable(true); + producer.setSendMsgTimeout(3000); + //设置到broker的心跳 + producer.setHeartbeatBrokerInterval(3000); + //从namesrv获取topic路由 + producer.setPollNameServerInterval(3000); + + try { + producer.start(); + } catch (MQClientException e) { + log(Level.ERROR, e, e.getMessage()); + throw new IllegalStateException("Unable to obtain producer"); + } + } + } + + private RPCHook getAclRpcHook() { + if (accessKey != null && secretKey != null && !accessKey.isEmpty() && !secretKey.isEmpty()) { + return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); + } + return null; + } + + private void log(Level level, Throwable t, String msg) { + if (logger != null) { + logger.log(level, msg, t); + } + } + + @Override + public void stop() { + if (producer != null) { + producer.shutdown(); + } + stopped = true; + done.countDown(); + } + + @Override + public void awaitStop() throws InterruptedException { + done.await(); + } + + @Override + public Collection> configSchema() { + // should return a list of all configuration options for this plugin + return PluginHelper.commonOutputSettings(Arrays.asList(PRODUCER_GROUP, NAMESRV_ADDR, TOPIC, TAGS, CODEC_CONFIG, CODEC_NAME)); + } + + @Override + public String getId() { + return id; + } +} diff --git a/src/test/java/org/logstashplugins/RocketmqOutputTest.java b/src/test/java/org/logstashplugins/RocketmqOutputTest.java new file mode 100644 index 0000000..9c4998e --- /dev/null +++ b/src/test/java/org/logstashplugins/RocketmqOutputTest.java @@ -0,0 +1,49 @@ +package org.logstashplugins; + +import co.elastic.logstash.api.Configuration; +import co.elastic.logstash.api.Event; +import org.junit.Test; +import org.logstash.Timestamp; +import org.logstash.plugins.ConfigurationImpl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class RocketmqOutputTest { + @Test + public void testRocketMQOutput() { + Map configValues = new HashMap<>(); + configValues.put(RocketMQ.NAMESRV_ADDR.name(), "10.62.16.226:31409"); +// configValues.put(RocketMQ.PRODUCER_GROUP.name(), ""); + configValues.put(RocketMQ.TOPIC.name(), "logstash-output-rocketmq"); +// configValues.put(RocketMQ.TAG.name(), ""); +// configValues.put(RocketMQ.CODEC_NAME.name(), "json_lines"); + + Configuration config = new ConfigurationImpl(configValues); + RocketMQ rocketmq = new RocketMQ("test-id", config, null); + + int eventCount = 5; + Collection events = new ArrayList<>(); + for (int k = 0; k < eventCount; k++) { + events.add(getEvent(k)); + } + rocketmq.output(events); + } + + private static Event getEvent(int index) { + Event event = new org.logstash.Event(); + event.setField("show", "true"); + event.setField("tableName", "cons_staff"); + event.setField("title", "鸿鹄" + index); + event.setField("comIds", "1730762885845381120"); + event.setField("buzType", "通讯录"); + event.setField("url", "cons_staff&1730762885996376064"); + event.setField("fromTypes", "WEB,APP"); + event.setField("createTime", Timestamp.now()); + event.setField("updateTime", Timestamp.now()); + event.setField("publicTime", Timestamp.now()); + return event; + } +}