17 #ifndef NMSG_PRIVATE_H 18 #define NMSG_PRIVATE_H 20 #include "nmsg_port_net.h" 25 # ifdef HAVE_SYS_ENDIAN_H 26 # include <sys/endian.h> 30 #include <sys/types.h> 31 #include <sys/socket.h> 56 #include <protobuf-c/protobuf-c.h> 63 #include <yajl/yajl_gen.h> 64 #include <yajl/yajl_tree.h> 68 #include "nmsg.pb-c.h" 71 #include "msgmod_plugin.h" 74 #include "libmy/crc32c.h" 75 #include "libmy/list.h" 76 #include "libmy/tree.h" 77 #include "libmy/ubuf.h" 78 #include "libmy/b64_decode.h" 79 #include "libmy/b64_encode.h" 80 #include "libmy/vector.h" 85 #define XSTR(x) STR(x) 87 #define NMSG_SEQSRC_GC_INTERVAL 120 88 #define NMSG_FRAG_GC_INTERVAL 30 89 #define NMSG_NSEC_PER_SEC 1000000000 91 #define NMSG_FLT_MODULE_PREFIX "nmsg_flt" XSTR(NMSG_FLTMOD_VERSION) 92 #define NMSG_MSG_MODULE_PREFIX "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION) 94 #define NMSG_MODULE_SUFFIX ".so" 96 #define _nmsg_dprintf(level, format, ...) \ 98 if (_nmsg_global_debug >= (level)) \ 99 fprintf(stderr, format, ##__VA_ARGS__); \ 102 #define _nmsg_dprintfv(var, level, format, ...) \ 104 if ((var) >= (level)) \ 105 fprintf(stderr, format, ##__VA_ARGS__); \ 111 nmsg_stream_type_file,
112 nmsg_stream_type_sock,
113 nmsg_stream_type_zmq,
114 nmsg_stream_type_null,
121 struct nmsg_container;
141 extern bool _nmsg_global_autoclose;
142 extern int _nmsg_global_debug;
158 uint64_t sequence_id;
171 uint64_t sequence_id;
173 uint64_t count_dropped;
176 char addr_str[INET6_ADDRSTRLEN];
183 struct sockaddr_storage addr_ss;
192 ProtobufCBinaryData *frags;
213 struct _nmsg_ipreasm *reasm;
218 struct bpf_program userbpf;
226 pthread_mutex_t lock;
236 pthread_mutex_t lock;
244 nmsg_stream_type type;
255 struct timespec lastgc;
265 struct nmsg_brate *brate;
267 struct sockaddr_storage addr_ss;
271 nmsg_input_stream_read_fp stream_read_fp;
276 pthread_mutex_t lock;
277 nmsg_stream_type type;
284 nmsg_random_t random;
293 uint64_t sequence_id;
311 nmsg_msgmod_t msgmod;
320 nmsg_input_read_fp read_fp;
321 nmsg_input_read_loop_fp read_loop_fp;
325 unsigned filter_msgtype;
338 nmsg_output_write_fp write_fp;
339 nmsg_output_flush_fp flush_fp;
343 unsigned filter_msgtype;
350 ProtobufCMessage *message;
351 Nmsg__NmsgPayload *np;
393 typedef enum nmsg_msgmod_clos_mode {
394 nmsg_msgmod_clos_m_keyval,
395 nmsg_msgmod_clos_m_multiline
396 } nmsg_msgmod_clos_mode;
401 nmsg_msgmod_clos_mode mode;
431 void _nmsg_alias_fini(
void);
435 ssize_t _nmsg_buf_avail(
struct nmsg_buf *buf);
436 ssize_t _nmsg_buf_used(
struct nmsg_buf *buf);
437 struct nmsg_buf * _nmsg_buf_new(
size_t sz);
438 void _nmsg_buf_destroy(
struct nmsg_buf **buf);
439 void _nmsg_buf_reset(
struct nmsg_buf *buf);
443 struct nmsg_dlmod * _nmsg_dlmod_init(
const char *path);
444 void _nmsg_dlmod_destroy(
struct nmsg_dlmod **dlmod);
457 nmsg_message_t _nmsg_message_from_payload(Nmsg__NmsgPayload *np);
458 nmsg_message_t _nmsg_message_dup(
struct nmsg_message *msg);
467 void _nmsg_payload_free_all(Nmsg__Nmsg *nc);
468 void _nmsg_payload_calc_crcs(Nmsg__Nmsg *nc);
469 void _nmsg_payload_free(Nmsg__NmsgPayload **np);
470 size_t _nmsg_payload_size(
const Nmsg__NmsgPayload *np);
473 nmsg_res _input_frag_read(nmsg_input_t, Nmsg__Nmsg **, uint8_t *buf,
size_t buf_len);
478 bool _input_nmsg_filter(nmsg_input_t,
unsigned, Nmsg__NmsgPayload *);
479 nmsg_res _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
480 nmsg_res _input_nmsg_loop(nmsg_input_t,
int, nmsg_cb_message,
void *);
481 nmsg_res _input_nmsg_unpack_container(nmsg_input_t, Nmsg__Nmsg **, uint8_t *,
size_t);
482 nmsg_res _input_nmsg_unpack_container2(
const uint8_t *,
size_t,
unsigned, Nmsg__Nmsg **);
483 nmsg_res _input_nmsg_read_container_file(nmsg_input_t, Nmsg__Nmsg **);
484 nmsg_res _input_nmsg_read_container_sock(nmsg_input_t, Nmsg__Nmsg **);
486 nmsg_res _input_nmsg_read_container_zmq(nmsg_input_t, Nmsg__Nmsg **);
488 nmsg_res _input_nmsg_deserialize_header(
const uint8_t *,
size_t, ssize_t *,
unsigned *);
491 nmsg_res _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
494 nmsg_res _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
495 nmsg_res _input_nmsg_loop_null(nmsg_input_t,
int, nmsg_cb_message,
void *);
498 nmsg_res _input_pcap_read(nmsg_input_t, nmsg_message_t *);
499 nmsg_res _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
502 nmsg_res _input_pres_read(nmsg_input_t, nmsg_message_t *);
505 nmsg_res _input_json_read(nmsg_input_t, nmsg_message_t *);
508 struct nmsg_seqsrc * _input_seqsrc_get(nmsg_input_t, Nmsg__Nmsg *);
509 void _input_seqsrc_destroy(nmsg_input_t);
510 size_t _input_seqsrc_update(nmsg_input_t,
struct nmsg_seqsrc *, Nmsg__Nmsg *);
513 void _output_stop(nmsg_output_t);
516 nmsg_res _output_frag_write(nmsg_output_t);
519 nmsg_res _output_nmsg_flush(nmsg_output_t);
520 nmsg_res _output_nmsg_write(nmsg_output_t, nmsg_message_t);
521 nmsg_res _output_nmsg_write_container(nmsg_output_t);
522 nmsg_res _output_nmsg_write_sock(nmsg_output_t, uint8_t *buf,
size_t len);
523 nmsg_res _output_nmsg_write_file(nmsg_output_t, uint8_t *buf,
size_t len);
525 nmsg_res _output_nmsg_write_zmq(nmsg_output_t, uint8_t *buf,
size_t len);
529 nmsg_res _output_pres_write(nmsg_output_t, nmsg_message_t);
532 nmsg_res _output_json_write(nmsg_output_t, nmsg_message_t);
535 struct nmsg_brate * _nmsg_brate_init(
size_t target_byte_rate);
536 void _nmsg_brate_destroy(
struct nmsg_brate **);
537 void _nmsg_brate_sleep(
struct nmsg_brate *,
size_t container_sz,
size_t n_payloads,
size_t n);
586 _nmsg_ipdg_parse_reasm(
struct nmsg_ipdg *dg,
unsigned etype,
size_t len,
587 const u_char *pkt,
struct _nmsg_ipreasm *reasm,
588 unsigned *new_len, u_char *new_pkt,
int *defrag,
Structure exported by message modules to implement a new message type.
an nmsg_message MUST always have a non-NULL ->np member.
Implementing message filter modules.
Base nmsg support header.
nmsg_res(* nmsg_cb_message_read)(nmsg_message_t *msg, void *user)
Callback function for generating nmsg messages.
Structure mapping protocol buffer schema fields to nmsg_msgmod_field_type values for "transparent" mo...
nmsg_output_type
An enum identifying the underlying implementation of an nmsg_output_t object.
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.