21#include "nmsg_port_net.h"
26# ifdef HAVE_SYS_ENDIAN_H
27# include <sys/endian.h>
32#include <sys/socket.h>
59#include <protobuf-c/protobuf-c.h>
66#include <librdkafka/rdkafka.h>
78#include "msgmod_plugin.h"
82#include "libmy/crc32c.h"
83#include "libmy/list.h"
84#include "libmy/tree.h"
85#include "libmy/ubuf.h"
86#include "libmy/b64_decode.h"
87#include "libmy/b64_encode.h"
88#include "libmy/vector.h"
89#include "libmy/fast_inet_ntop.h"
91#include "config_file.h"
100#define XSTR(x) STR(x)
102#define NMSG_SEQSRC_GC_INTERVAL 120
103#define NMSG_FRAG_GC_INTERVAL 30
104#define NMSG_NSEC_PER_SEC 1000000000
106#define DEFAULT_STRBUF_ALLOC_SZ 16384
108#define NMSG_FLT_MODULE_PREFIX "nmsg_flt" XSTR(NMSG_FLTMOD_VERSION)
109#define NMSG_STATS_MODULE_PREFIX "nmsg_stats" XSTR(NMSG_STATSMOD_VERSION)
110#define NMSG_MSG_MODULE_PREFIX "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION)
112#define NMSG_MODULE_SUFFIX ".so"
114#define _nmsg_dprintf(level, format, ...) \
116 if (_nmsg_global_debug >= (level)) \
117 fprintf(stderr, format, ##__VA_ARGS__); \
120#define _nmsg_dprintfv(var, level, format, ...) \
122 if ((var) >= (level)) \
123 fprintf(stderr, format, ##__VA_ARGS__); \
129 nmsg_stream_type_file,
130 nmsg_stream_type_sock,
131 nmsg_stream_type_zmq,
132 nmsg_stream_type_kafka,
133 nmsg_stream_type_null,
140struct nmsg_container;
160extern bool _nmsg_global_autoclose;
161extern int _nmsg_global_debug;
177 uint64_t sequence_id;
190 uint64_t sequence_id;
192 uint64_t count_dropped;
195 char addr_str[INET6_ADDRSTRLEN];
202 struct sockaddr_storage addr_ss;
211 ProtobufCBinaryData *frags;
232 struct _nmsg_ipreasm *reasm;
237 struct bpf_program userbpf;
245 pthread_mutex_t lock;
258 pthread_mutex_t lock;
269#ifdef HAVE_LIBRDKAFKA
271 const char *key_field;
281 nmsg_stream_type type;
286#ifdef HAVE_LIBRDKAFKA
295 struct timespec lastgc;
305 struct nmsg_brate *brate;
307 struct sockaddr_storage addr_ss;
311 nmsg_input_stream_read_fp stream_read_fp;
316 pthread_mutex_t c_lock;
317 pthread_mutex_t w_lock;
318 nmsg_stream_type type;
323#ifdef HAVE_LIBRDKAFKA
328 nmsg_random_t random;
336 atomic_uint_fast32_t so_sequence_num;
337 uint64_t sequence_id;
355 nmsg_msgmod_t msgmod;
365 nmsg_input_read_fp read_fp;
366 nmsg_input_read_loop_fp read_loop_fp;
370 unsigned filter_msgtype;
384 nmsg_output_write_fp write_fp;
385 nmsg_output_flush_fp flush_fp;
389 unsigned filter_msgtype;
396 ProtobufCMessage *message;
439typedef enum nmsg_msgmod_clos_mode {
440 nmsg_msgmod_clos_m_keyval,
441 nmsg_msgmod_clos_m_multiline
442} nmsg_msgmod_clos_mode;
447 nmsg_msgmod_clos_mode mode;
475 char fixed[DEFAULT_STRBUF_ALLOC_SZ];
483void _nmsg_alias_fini(
void);
487ssize_t _nmsg_buf_avail(
struct nmsg_buf *buf);
488ssize_t _nmsg_buf_used(
struct nmsg_buf *buf);
489struct nmsg_buf * _nmsg_buf_new(
size_t sz);
490void _nmsg_buf_destroy(
struct nmsg_buf **buf);
491void _nmsg_buf_reset(
struct nmsg_buf *buf);
495struct nmsg_dlmod * _nmsg_dlmod_init(
const char *path);
496void _nmsg_dlmod_destroy(
struct nmsg_dlmod **dlmod);
510nmsg_message_t _nmsg_message_dup(
struct nmsg_message *msg);
512nmsg_res _nmsg_message_to_json(nmsg_output_t output, nmsg_message_t msg,
struct nmsg_strbuf *sb);
513#ifdef HAVE_LIBRDKAFKA
514nmsg_res _nmsg_message_get_field_value_as_key(nmsg_message_t msg,
const char *name,
struct nmsg_strbuf *sb);
526char * _nmsg_strbuf_detach(
struct nmsg_strbuf *size);
542nmsg_res _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
545nmsg_res _input_nmsg_unpack_container2(
const uint8_t *,
size_t,
unsigned,
Nmsg__Nmsg **);
548#ifdef HAVE_LIBRDKAFKA
554nmsg_res _input_nmsg_deserialize_header(
const uint8_t *,
size_t, ssize_t *,
unsigned *);
557nmsg_res _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
560nmsg_res _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
564nmsg_res _input_pcap_read(nmsg_input_t, nmsg_message_t *);
565nmsg_res _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
568nmsg_res _input_pres_read(nmsg_input_t, nmsg_message_t *);
571nmsg_res _input_json_read(nmsg_input_t, nmsg_message_t *);
572#ifdef HAVE_LIBRDKAFKA
573nmsg_res _input_kafka_json_read(nmsg_input_t, nmsg_message_t *);
578void _input_seqsrc_destroy(nmsg_input_t);
582nmsg_input_t _input_open_kafka(
void *s);
585void _output_stop(nmsg_output_t);
586nmsg_output_t _output_open_kafka(
void *s,
size_t bufsz);
589nmsg_res _output_nmsg_flush(nmsg_output_t);
590nmsg_res _output_nmsg_write(nmsg_output_t, nmsg_message_t);
593nmsg_res _output_pres_write(nmsg_output_t, nmsg_message_t);
596nmsg_res _output_json_write(nmsg_output_t, nmsg_message_t);
597#ifdef HAVE_LIBRDKAFKA
598nmsg_res _output_kafka_json_write(nmsg_output_t output, nmsg_message_t msg);
599nmsg_res _output_kafka_json_flush(nmsg_output_t);
603struct nmsg_brate * _nmsg_brate_init(
size_t target_byte_rate);
604void _nmsg_brate_destroy(
struct nmsg_brate **);
605void _nmsg_brate_sleep(
struct nmsg_brate *,
size_t container_sz,
size_t n_payloads,
size_t n);
654_nmsg_ipdg_parse_reasm(
struct nmsg_ipdg *dg,
unsigned etype,
size_t len,
655 const u_char *pkt,
struct _nmsg_ipreasm *reasm,
656 unsigned *new_len, u_char *new_pkt,
int *defrag,
Implementing message filter modules.
Base nmsg support header.
nmsg_res(* nmsg_cb_message_read)(nmsg_message_t *msg, void *user)
Callback function for generating nmsg messages.
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
nmsg_output_type
An enum identifying the underlying implementation of an nmsg_output_t object.
Implementing statistics export modules.
an nmsg_message MUST always have a non-NULL ->np member.
Structure mapping protocol buffer schema fields to nmsg_msgmod_field_type values for "transparent" mo...
Structure exported by message modules to implement a new message type.