nmsg  0.13.2
private.h
1 /*
2  * Copyright (c) 2008-2015 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef NMSG_PRIVATE_H
18 #define NMSG_PRIVATE_H
19 
20 #include "nmsg_port_net.h"
21 
22 #ifdef HAVE_ENDIAN_H
23 # include <endian.h>
24 #else
25 # ifdef HAVE_SYS_ENDIAN_H
26 # include <sys/endian.h>
27 # endif
28 #endif
29 
30 #include <sys/types.h>
31 #include <sys/socket.h>
32 #include <sys/time.h>
33 #include <sys/stat.h>
34 #include <assert.h>
35 #include <ctype.h>
36 #include <errno.h>
37 #include <fcntl.h>
38 #include <inttypes.h>
39 #include <limits.h>
40 #include <pthread.h>
41 #include <poll.h>
42 #include <signal.h>
43 #include <stdarg.h>
44 #include <stdbool.h>
45 #include <stddef.h>
46 #include <stdio.h>
47 #include <stdint.h>
48 #include <stdlib.h>
49 #include <string.h>
50 #include <strings.h>
51 #include <time.h>
52 #include <unistd.h>
53 
54 #include <zlib.h>
55 
56 #include <protobuf-c/protobuf-c.h>
57 
58 #ifdef HAVE_LIBXS
59 # include <xs/xs.h>
60 #endif /* HAVE_LIBXS */
61 
62 #ifdef HAVE_YAJL
63 #include <yajl/yajl_gen.h>
64 #include <yajl/yajl_tree.h>
65 #endif /* HAVE_YAJL */
66 
67 #include "nmsg.h"
68 #include "nmsg.pb-c.h"
69 
70 #include "fltmod_plugin.h"
71 #include "msgmod_plugin.h"
72 #include "ipreasm.h"
73 
74 #include "libmy/crc32c.h"
75 #include "libmy/list.h"
76 #include "libmy/tree.h"
77 #include "libmy/ubuf.h"
78 #include "libmy/b64_decode.h"
79 #include "libmy/b64_encode.h"
80 #include "libmy/vector.h"
81 
82 /* Macros. */
83 
84 #define STR(x) #x
85 #define XSTR(x) STR(x)
86 
87 #define NMSG_SEQSRC_GC_INTERVAL 120
88 #define NMSG_FRAG_GC_INTERVAL 30
89 #define NMSG_NSEC_PER_SEC 1000000000
90 
91 #define NMSG_FLT_MODULE_PREFIX "nmsg_flt" XSTR(NMSG_FLTMOD_VERSION)
92 #define NMSG_MSG_MODULE_PREFIX "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION)
93 
94 #define NMSG_MODULE_SUFFIX ".so"
95 
96 #define _nmsg_dprintf(level, format, ...) \
97 do { \
98  if (_nmsg_global_debug >= (level)) \
99  fprintf(stderr, format, ##__VA_ARGS__); \
100 } while (0)
101 
102 #define _nmsg_dprintfv(var, level, format, ...) \
103 do { \
104  if ((var) >= (level)) \
105  fprintf(stderr, format, ##__VA_ARGS__); \
106 } while (0)
107 
108 /* Enums. */
109 
110 typedef enum {
111  nmsg_stream_type_file,
112  nmsg_stream_type_sock,
113  nmsg_stream_type_xs,
114  nmsg_stream_type_null,
115 } nmsg_stream_type;
116 
117 /* Forward. */
118 
119 struct nmsg_brate;
120 struct nmsg_buf;
121 struct nmsg_container;
122 struct nmsg_dlmod;
123 struct nmsg_frag;
124 struct nmsg_frag_key;
125 struct nmsg_frag_tree;
126 struct nmsg_input;
127 struct nmsg_json;
128 struct nmsg_output;
129 struct nmsg_msgmod;
130 struct nmsg_msgmod_field;
131 struct nmsg_msgmod_clos;
132 struct nmsg_pcap;
133 struct nmsg_pres;
134 struct nmsg_stream_input;
135 struct nmsg_stream_output;
136 struct nmsg_seqsrc;
137 struct nmsg_seqsrc_key;
138 
139 /* Globals. */
140 
141 extern bool _nmsg_global_autoclose;
142 extern int _nmsg_global_debug;
143 extern struct nmsg_msgmodset * _nmsg_global_msgmodset;
144 
145 /* Function types. */
146 
147 typedef nmsg_res (*nmsg_input_read_fp)(struct nmsg_input *, nmsg_message_t *);
148 typedef nmsg_res (*nmsg_input_read_loop_fp)(struct nmsg_input *, int,
149  nmsg_cb_message, void *);
150 typedef nmsg_res (*nmsg_input_stream_read_fp)(struct nmsg_input *, Nmsg__Nmsg **);
151 typedef nmsg_res (*nmsg_output_write_fp)(struct nmsg_output *, nmsg_message_t);
152 typedef nmsg_res (*nmsg_output_flush_fp)(struct nmsg_output *);
153 
154 /* Data types. */
155 
156 /* nmsg_seqsrc */
158  uint64_t sequence_id;
159  sa_family_t af;
160  uint16_t port;
161  union {
162  uint8_t ip4[4];
163  uint8_t ip6[16];
164  };
165 };
166 
167 struct nmsg_seqsrc {
168  ISC_LINK(struct nmsg_seqsrc) link;
169  struct nmsg_seqsrc_key key;
170  uint32_t sequence;
171  uint64_t sequence_id;
172  uint64_t count;
173  uint64_t count_dropped;
174  time_t last;
175  bool init;
176  char addr_str[INET6_ADDRSTRLEN];
177 };
178 
179 /* nmsg_frag: used by nmsg_stream_input */
181  uint32_t id;
182  uint32_t crc;
183  struct sockaddr_storage addr_ss;
184 };
185 
186 struct nmsg_frag {
187  RB_ENTRY(nmsg_frag) link;
188  struct nmsg_frag_key key;
189  unsigned last;
190  unsigned rem;
191  struct timespec ts;
192  ProtobufCBinaryData *frags;
193 };
194 
195 /* nmsg_frag_tree: used by nmsg_stream_input */
197  RB_HEAD(frag_ent, nmsg_frag) head;
198 };
199 
200 /* nmsg_buf: used by nmsg_stream_input, nmsg_stream_output */
201 struct nmsg_buf {
202  int fd;
203  size_t bufsz;
204  u_char *data; /* allocated data starts here */
205  u_char *pos; /* position of next buffer read */
206  u_char *end; /* one byte beyond valid data */
207 };
208 
209 /* nmsg_pcap: used by nmsg_input */
210 struct nmsg_pcap {
211  int datalink;
212  pcap_t *handle;
213  struct _nmsg_ipreasm *reasm;
214  u_char *new_pkt;
215 
216  pcap_t *user;
217  char *userbpft;
218  struct bpf_program userbpf;
219 
220  nmsg_pcap_type type;
221  bool raw;
222 };
223 
224 /* nmsg_pres: used by nmsg_input and nmsg_output */
225 struct nmsg_pres {
226  pthread_mutex_t lock;
227  FILE *fp;
228  bool flush;
229  char *endline;
230 };
231 
232 /* nmsg_json: used by nmsg_input and nmsg_output */
233 struct nmsg_json {
234 #ifdef HAVE_YAJL
235 #endif /* HAVE_YAJL */
236  pthread_mutex_t lock;
237  FILE *fp;
238  int orig_fd;
239  bool flush;
240 };
241 
242 /* nmsg_stream_input: used by nmsg_input */
244  nmsg_stream_type type;
245  struct nmsg_buf *buf;
246 #ifdef HAVE_LIBXS
247  void *xs;
248 #endif /* HAVE_LIBXS */
249  Nmsg__Nmsg *nmsg;
250  unsigned np_index;
251  size_t nc_size;
252  struct nmsg_frag_tree nft;
253  struct pollfd pfd;
254  struct timespec now;
255  struct timespec lastgc;
256  unsigned nfrags;
257  unsigned flags;
258  nmsg_zbuf_t zb;
259  u_char *zb_tmp;
260  unsigned source;
261  unsigned operator;
262  unsigned group;
263  bool blocking_io;
264  bool verify_seqsrc;
265  struct nmsg_brate *brate;
266  ISC_LIST(struct nmsg_seqsrc) seqsrcs;
267  struct sockaddr_storage addr_ss;
268  uint64_t count_recv;
269  uint64_t count_drop;
270 
271  nmsg_input_stream_read_fp stream_read_fp;
272 };
273 
274 /* nmsg_stream_output: used by nmsg_output */
276  pthread_mutex_t lock;
277  nmsg_stream_type type;
278  int fd;
279 #ifdef HAVE_LIBXS
280  void *xs;
281 #endif /* HAVE_LIBXS */
282  nmsg_container_t c;
283  size_t bufsz;
284  nmsg_random_t random;
285  nmsg_rate_t rate;
286  bool buffered;
287  unsigned source;
288  unsigned operator;
289  unsigned group;
290  bool do_zlib;
291  bool do_sequence;
292  uint32_t sequence;
293  uint64_t sequence_id;
294 };
295 
296 /* nmsg_callback_output: used by nmsg_output */
298  nmsg_cb_message cb;
299  void *user;
300 };
301 
302 /* nmsg_callback_input: used by nmsg_input */
305  void *user;
306 };
307 
308 /* nmsg_input */
309 struct nmsg_input {
310  nmsg_input_type type;
311  nmsg_msgmod_t msgmod;
312  void *clos;
313  union {
314  struct nmsg_stream_input *stream;
315  struct nmsg_pcap *pcap;
316  struct nmsg_pres *pres;
317  struct nmsg_json *json;
318  struct nmsg_callback_input *callback;
319  };
320  nmsg_input_read_fp read_fp;
321  nmsg_input_read_loop_fp read_loop_fp;
322 
323  bool do_filter;
324  unsigned filter_vid;
325  unsigned filter_msgtype;
326  volatile bool stop;
327 };
328 
329 /* nmsg_output */
330 struct nmsg_output {
331  nmsg_output_type type;
332  union {
333  struct nmsg_stream_output *stream;
334  struct nmsg_pres *pres;
335  struct nmsg_json *json;
336  struct nmsg_callback_output *callback;
337  };
338  nmsg_output_write_fp write_fp;
339  nmsg_output_flush_fp flush_fp;
340 
341  bool do_filter;
342  unsigned filter_vid;
343  unsigned filter_msgtype;
344  volatile bool stop;
345 };
346 
347 /* nmsg_message */
348 struct nmsg_message {
349  nmsg_msgmod_t mod;
350  ProtobufCMessage *message;
351  Nmsg__NmsgPayload *np;
352  void *msg_clos;
353  size_t n_allocs;
354  void **allocs;
355  bool updated;
356 };
357 
385 /* dlmod / msgmod / msgmodset */
386 
387 struct nmsg_dlmod {
388  ISC_LINK(struct nmsg_dlmod) link;
389  char *path;
390  void *handle;
391 };
392 
393 typedef enum nmsg_msgmod_clos_mode {
394  nmsg_msgmod_clos_m_keyval,
395  nmsg_msgmod_clos_m_multiline
396 } nmsg_msgmod_clos_mode;
397 
399  char *nmsg_pbuf;
400  size_t estsz;
401  nmsg_msgmod_clos_mode mode;
402  struct nmsg_msgmod_field *field;
403  struct nmsg_strbuf *strbufs;
404  void *mod_clos;
405 };
406 
408  struct nmsg_msgmod **msgtypes;
409  char *vname;
410  size_t nm;
411 };
412 
413 struct nmsg_msgmod {
414  struct nmsg_msgmod_plugin *plugin;
415  struct nmsg_msgmod_field *fields;
416  struct nmsg_msgmod_field **fields_idx;
417  size_t n_fields;
418 };
419 
421  ISC_LIST(struct nmsg_dlmod) dlmods;
422  struct nmsg_msgvendor **vendors;
423  size_t nv;
424 };
425 
426 /* Prototypes. */
427 
428 /* from alias.c */
429 
430 nmsg_res _nmsg_alias_init(void);
431 void _nmsg_alias_fini(void);
432 
433 /* from buf.c */
434 
435 ssize_t _nmsg_buf_avail(struct nmsg_buf *buf);
436 ssize_t _nmsg_buf_used(struct nmsg_buf *buf);
437 struct nmsg_buf * _nmsg_buf_new(size_t sz);
438 void _nmsg_buf_destroy(struct nmsg_buf **buf);
439 void _nmsg_buf_reset(struct nmsg_buf *buf);
440 
441 /* from dlmod.c */
442 
443 struct nmsg_dlmod * _nmsg_dlmod_init(const char *path);
444 void _nmsg_dlmod_destroy(struct nmsg_dlmod **dlmod);
445 
446 /* from msgmod.c */
447 
448 struct nmsg_msgmod * _nmsg_msgmod_start(struct nmsg_msgmod_plugin *plugin);
449 void _nmsg_msgmod_stop(struct nmsg_msgmod **mod);
450 
451 /* from message.c */
452 
453 nmsg_res _nmsg_message_init_message(struct nmsg_message *msg);
454 nmsg_res _nmsg_message_init_payload(struct nmsg_message *msg);
455 nmsg_res _nmsg_message_deserialize(struct nmsg_message *msg);
456 nmsg_res _nmsg_message_serialize(struct nmsg_message *msg);
457 nmsg_message_t _nmsg_message_from_payload(Nmsg__NmsgPayload *np);
458 nmsg_message_t _nmsg_message_dup(struct nmsg_message *msg);
459 nmsg_res _nmsg_message_dup_protobuf(const struct nmsg_message *msg, ProtobufCMessage **dst);
460 
461 /* from msgmodset.c */
462 
463 struct nmsg_msgmodset * _nmsg_msgmodset_init(const char *path);
464 void _nmsg_msgmodset_destroy(struct nmsg_msgmodset **);
465 
466 /* from payload.c */
467 void _nmsg_payload_free_all(Nmsg__Nmsg *nc);
468 void _nmsg_payload_calc_crcs(Nmsg__Nmsg *nc);
469 void _nmsg_payload_free(Nmsg__NmsgPayload **np);
470 size_t _nmsg_payload_size(const Nmsg__NmsgPayload *np);
471 
472 /* from input_frag.c */
473 nmsg_res _input_frag_read(nmsg_input_t, Nmsg__Nmsg **, uint8_t *buf, size_t buf_len);
474 void _input_frag_destroy(struct nmsg_stream_input *);
475 void _input_frag_gc(struct nmsg_stream_input *);
476 
477 /* from input_nmsg.c */
478 bool _input_nmsg_filter(nmsg_input_t, unsigned, Nmsg__NmsgPayload *);
479 nmsg_res _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
480 nmsg_res _input_nmsg_loop(nmsg_input_t, int, nmsg_cb_message, void *);
481 nmsg_res _input_nmsg_unpack_container(nmsg_input_t, Nmsg__Nmsg **, uint8_t *, size_t);
482 nmsg_res _input_nmsg_unpack_container2(const uint8_t *, size_t, unsigned, Nmsg__Nmsg **);
483 nmsg_res _input_nmsg_read_container_file(nmsg_input_t, Nmsg__Nmsg **);
484 nmsg_res _input_nmsg_read_container_sock(nmsg_input_t, Nmsg__Nmsg **);
485 #ifdef HAVE_LIBXS
486 nmsg_res _input_nmsg_read_container_xs(nmsg_input_t, Nmsg__Nmsg **);
487 #endif /* HAVE_LIBXS */
488 nmsg_res _input_nmsg_deserialize_header(const uint8_t *, size_t, ssize_t *, unsigned *);
489 
490 /* from input_callback.c */
491 nmsg_res _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
492 
493 /* from input_nullnmsg.c */
494 nmsg_res _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
495 nmsg_res _input_nmsg_loop_null(nmsg_input_t, int, nmsg_cb_message, void *);
496 
497 /* from input_pcap.c */
498 nmsg_res _input_pcap_read(nmsg_input_t, nmsg_message_t *);
499 nmsg_res _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
500 
501 /* from input_pres.c */
502 nmsg_res _input_pres_read(nmsg_input_t, nmsg_message_t *);
503 
504 /* from input_json.c */
505 nmsg_res _input_json_read(nmsg_input_t, nmsg_message_t *);
506 
507 /* from input_seqsrc.c */
508 struct nmsg_seqsrc * _input_seqsrc_get(nmsg_input_t, Nmsg__Nmsg *);
509 void _input_seqsrc_destroy(nmsg_input_t);
510 size_t _input_seqsrc_update(nmsg_input_t, struct nmsg_seqsrc *, Nmsg__Nmsg *);
511 
512 /* from output.c */
513 void _output_stop(nmsg_output_t);
514 
515 /* from output_frag.c */
516 nmsg_res _output_frag_write(nmsg_output_t);
517 
518 /* from output_nmsg.c */
519 nmsg_res _output_nmsg_flush(nmsg_output_t);
520 nmsg_res _output_nmsg_write(nmsg_output_t, nmsg_message_t);
521 nmsg_res _output_nmsg_write_container(nmsg_output_t);
522 nmsg_res _output_nmsg_write_sock(nmsg_output_t, uint8_t *buf, size_t len);
523 nmsg_res _output_nmsg_write_file(nmsg_output_t, uint8_t *buf, size_t len);
524 #ifdef HAVE_LIBXS
525 nmsg_res _output_nmsg_write_xs(nmsg_output_t, uint8_t *buf, size_t len);
526 #endif /* HAVE_LIBXS */
527 
528 /* from output_pres.c */
529 nmsg_res _output_pres_write(nmsg_output_t, nmsg_message_t);
530 
531 /* from output_json.c */
532 nmsg_res _output_json_write(nmsg_output_t, nmsg_message_t);
533 
534 /* from brate.c */
535 struct nmsg_brate * _nmsg_brate_init(size_t target_byte_rate);
536 void _nmsg_brate_destroy(struct nmsg_brate **);
537 void _nmsg_brate_sleep(struct nmsg_brate *, size_t container_sz, size_t n_payloads, size_t n);
538 
539 /* from ipdg.c */
540 
585 nmsg_res
586 _nmsg_ipdg_parse_reasm(struct nmsg_ipdg *dg, unsigned etype, size_t len,
587  const u_char *pkt, struct _nmsg_ipreasm *reasm,
588  unsigned *new_len, u_char *new_pkt, int *defrag,
589  uint64_t timestamp);
590 
591 #endif /* NMSG_PRIVATE_H */