add libipfix
[openwrt/staging/florian.git] / package / libipfix / patches / 100-openimp_sync.patch
1 --- a/lib/ipfix.c
2 +++ b/lib/ipfix.c
3 @@ -37,6 +37,9 @@ $$LIC$$
4 #ifdef SCTPSUPPORT
5 #include <netinet/sctp.h>
6 #endif
7 +#ifndef NOTHREADS
8 +#include <pthread.h>
9 +#endif
10 #include <fcntl.h>
11 #include <netdb.h>
12
13 @@ -123,6 +126,18 @@ static uint16_t g_lasttid;
14 static ipfix_datarecord_t g_data = { NULL, NULL, 0 }; /* ipfix_export */
15
16 static ipfix_field_t *g_ipfix_fields;
17 +#ifndef NOTHREADS
18 +static pthread_mutex_t g_mutex;
19 +#define mod_lock() { \
20 + if ( pthread_mutex_lock( &g_mutex ) !=0 ) \
21 + mlogf( 0, "[ipfix] mutex_lock() failed: %s\n", \
22 + strerror( errno ) ); \
23 + }
24 +#define mod_unlock() { pthread_mutex_unlock( &g_mutex ); }
25 +#else
26 +#define mod_lock()
27 +#define mod_unlock()
28 +#endif
29
30 /*----- prototypes -------------------------------------------------------*/
31
32 @@ -133,6 +148,7 @@ int _ipfix_send_message( ipfix_t *ifh,
33 ipfix_message_t *message );
34 int _ipfix_write_msghdr( ipfix_t *ifh, ipfix_message_t *msg, iobuf_t *buf );
35 void _ipfix_disconnect( ipfix_collector_t *col );
36 +int _ipfix_export_flush( ipfix_t *ifh );
37
38
39 /* name : do_writeselect
40 @@ -576,16 +592,18 @@ int ipfix_decode_float( void *in, void *
41
42 int ipfix_snprint_float( char *str, size_t size, void *data, size_t len )
43 {
44 - float tmp32;
45 - double tmp64;
46 + uint32_t tmp32;
47 + uint64_t tmp64;
48
49 switch ( len ) {
50 case 4:
51 - ipfix_decode_float( data, &tmp32, 4);
52 - return snprintf( str, size, "%f", tmp32 );
53 + memcpy( &tmp32, data, len );
54 + tmp32 = htonl( tmp32 );
55 + return snprintf( str, size, "%f", (float)tmp32 );
56 case 8:
57 - ipfix_decode_float( data, &tmp64, 8);
58 - return snprintf( str, size, "%lf", tmp64);
59 + memcpy( &tmp64, data, len );
60 + tmp64 = HTONLL( tmp64 );
61 + return snprintf( str, size, "%lf", (double)tmp64 );
62 default:
63 break;
64 }
65 @@ -682,12 +700,19 @@ int ipfix_get_eno_ieid( char *field, int
66 * parameters:
67 * remarks: init module, read field type info.
68 */
69 -int ipfix_init ( void )
70 +int ipfix_init( void )
71 {
72 if ( g_tstart ) {
73 ipfix_cleanup();
74 }
75
76 +#ifndef NOTHREADS
77 + if ( pthread_mutex_init( &g_mutex, NULL ) !=0 ) {
78 + mlogf( 0, "[ipfix] pthread_mutex_init() failed: %s\n",
79 + strerror(errno) );
80 + return -1;
81 + }
82 +#endif
83 g_tstart = time(NULL);
84 signal( SIGPIPE, SIG_IGN );
85 g_lasttid = 255;
86 @@ -806,6 +831,9 @@ void ipfix_cleanup ( void )
87 g_data.maxfields = 0;
88 g_data.lens = NULL;
89 g_data.addrs = NULL;
90 +#ifndef NOTHREADS
91 + (void)pthread_mutex_destroy( &g_mutex );
92 +#endif
93 }
94
95 int _ipfix_connect ( ipfix_collector_t *col )
96 @@ -1465,7 +1493,7 @@ int _ipfix_write_template( ipfix_t
97 default:
98 /* check space */
99 if ( tsize+ifh->offset > IPFIX_DEFAULT_BUFLEN ) {
100 - if ( ipfix_export_flush( ifh ) < 0 )
101 + if ( _ipfix_export_flush( ifh ) < 0 )
102 return -1;
103 if ( tsize+ifh->offset > IPFIX_DEFAULT_BUFLEN )
104 return -1;
105 @@ -1474,6 +1502,8 @@ int _ipfix_write_template( ipfix_t
106 /* write template prior to data */
107 if ( ifh->offset > 0 ) {
108 memmove( ifh->buffer + tsize, ifh->buffer, ifh->offset );
109 + if ( ifh->cs_tid )
110 + ifh->cs_header += tsize;
111 }
112
113 buf = ifh->buffer;
114 @@ -1615,8 +1645,11 @@ int ipfix_open( ipfix_t **ipfixh, int so
115 return -1;
116 }
117 node->ifh = i;
118 +
119 + mod_lock();
120 node->next = g_ipfixlist;
121 g_ipfixlist = node;
122 + mod_unlock();
123
124 *ipfixh = i;
125 return 0;
126 @@ -1633,7 +1666,8 @@ void ipfix_close( ipfix_t *h )
127 {
128 ipfix_node_t *l, *n;
129
130 - ipfix_export_flush( h );
131 + mod_lock();
132 + _ipfix_export_flush( h );
133
134 while( h->collectors )
135 _ipfix_drop_collector( (ipfix_collector_t**)&h->collectors );
136 @@ -1659,6 +1693,7 @@ void ipfix_close( ipfix_t *h )
137 #endif
138 free(h->buffer);
139 free(h);
140 + mod_unlock();
141 }
142 }
143
144 @@ -2156,6 +2191,22 @@ void ipfix_release_template( ipfix_t *if
145 ipfix_delete_template( ifh, templ );
146 }
147
148 +static void _finish_cs( ipfix_t *ifh )
149 +{
150 + size_t buflen;
151 + uint8_t *buf;
152 +
153 + /* finish current dataset */
154 + if ( (buf=ifh->cs_header) ==NULL )
155 + return;
156 + buflen = 0;
157 + INSERTU16( buf+buflen, buflen, ifh->cs_tid );
158 + INSERTU16( buf+buflen, buflen, ifh->cs_bytes );
159 + ifh->cs_bytes = 0;
160 + ifh->cs_header = NULL;
161 + ifh->cs_tid = 0;
162 +}
163 +
164 int ipfix_export( ipfix_t *ifh, ipfix_template_t *templ, ... )
165 {
166 int i;
167 @@ -2199,13 +2250,14 @@ int ipfix_export( ipfix_t *ifh, ipfix_te
168 g_data.addrs, g_data.lens );
169 }
170
171 -int ipfix_export_array( ipfix_t *ifh,
172 - ipfix_template_t *templ,
173 - int nfields,
174 - void **fields,
175 - uint16_t *lengths )
176 +static int
177 +_ipfix_export_array( ipfix_t *ifh,
178 + ipfix_template_t *templ,
179 + int nfields,
180 + void **fields,
181 + uint16_t *lengths )
182 {
183 - int i;
184 + int i, newset_f=0;
185 size_t buflen, datasetlen;
186 uint8_t *p, *buf;
187
188 @@ -2249,7 +2301,19 @@ int ipfix_export_array( ipfix_t
189
190 /** get size of data set, check space
191 */
192 - for ( i=0, datasetlen=4; i<nfields; i++ ) {
193 + if ( templ->tid == ifh->cs_tid ) {
194 + newset_f = 0;
195 + datasetlen = 0;
196 + }
197 + else {
198 + if ( ifh->cs_tid > 0 ) {
199 + _finish_cs( ifh );
200 + }
201 + newset_f = 1;
202 + datasetlen = 4;
203 + }
204 +
205 + for ( i=0; i<nfields; i++ ) {
206 if ( templ->fields[i].flength == IPFIX_FT_VARLEN ) {
207 if ( lengths[i]>254 )
208 datasetlen += 3;
209 @@ -2263,21 +2327,29 @@ int ipfix_export_array( ipfix_t
210 }
211 datasetlen += lengths[i];
212 }
213 - if ( ((ifh->offset + datasetlen) > IPFIX_DEFAULT_BUFLEN )
214 - && (ipfix_export_flush( ifh ) <0) ) {
215 - return -1;
216 +
217 + if ( (ifh->offset + datasetlen) > IPFIX_DEFAULT_BUFLEN ) {
218 + if ( ifh->cs_tid )
219 + _finish_cs( ifh );
220 + newset_f = 1;
221 +
222 + if ( _ipfix_export_flush( ifh ) <0 )
223 + return -1;
224 }
225
226 - /* fill buffer
227 - */
228 + /* fill buffer */
229 buf = (uint8_t*)(ifh->buffer) + ifh->offset;
230 buflen = 0;
231
232 - /* insert data set
233 - */
234 - ifh->nrecords ++;
235 - INSERTU16( buf+buflen, buflen, templ->tid );
236 - INSERTU16( buf+buflen, buflen, datasetlen );
237 + if ( newset_f ) {
238 + /* insert data set
239 + */
240 + ifh->cs_bytes = 0;
241 + ifh->cs_header = buf;
242 + ifh->cs_tid = templ->tid;
243 + INSERTU16( buf+buflen, buflen, templ->tid );
244 + INSERTU16( buf+buflen, buflen, 4 );
245 + }
246
247 /* insert data record
248 */
249 @@ -2303,7 +2375,9 @@ int ipfix_export_array( ipfix_t
250 buflen += lengths[i];
251 }
252
253 + ifh->nrecords ++;
254 ifh->offset += buflen;
255 + ifh->cs_bytes += buflen;
256 if ( ifh->version == IPFIX_VERSION )
257 ifh->seqno ++;
258 return 0;
259 @@ -2313,7 +2387,7 @@ int ipfix_export_array( ipfix_t
260 * parameters:
261 * remarks: rewrite this func!
262 */
263 -int ipfix_export_flush( ipfix_t *ifh )
264 +int _ipfix_export_flush( ipfix_t *ifh )
265 {
266 iobuf_t *buf;
267 ipfix_collector_t *col;
268 @@ -2322,8 +2396,14 @@ int ipfix_export_flush( ipfix_t *ifh )
269 if ( (ifh==NULL) || (ifh->offset==0) )
270 return 0;
271
272 - if ( (buf=_ipfix_getbuf()) ==NULL )
273 + if ( ifh->cs_tid > 0 ) {
274 + /* finish current dataset */
275 + _finish_cs( ifh );
276 + }
277 +
278 + if ( (buf=_ipfix_getbuf()) ==NULL ) {
279 return -1;
280 + }
281
282 #ifdef DEBUG
283 mlogf( 0, "[ipfix_export_flush] msg has %d records, %d bytes\n",
284 @@ -2350,3 +2430,30 @@ int ipfix_export_flush( ipfix_t *ifh )
285 _ipfix_freebuf( buf );
286 return ret;
287 }
288 +
289 +int ipfix_export_array( ipfix_t *ifh,
290 + ipfix_template_t *templ,
291 + int nfields,
292 + void **fields,
293 + uint16_t *lengths )
294 +{
295 + int ret;
296 +
297 + mod_lock();
298 + ret = _ipfix_export_array( ifh, templ, nfields, fields, lengths );
299 + mod_unlock();
300 +
301 + return ret;
302 +}
303 +
304 +int ipfix_export_flush( ipfix_t *ifh )
305 +{
306 + int ret;
307 +
308 + mod_lock();
309 + ret = _ipfix_export_flush( ifh );
310 + mod_unlock();
311 +
312 + return ret;
313 +}
314 +
315 --- a/lib/ipfix.h
316 +++ b/lib/ipfix.h
317 @@ -142,6 +142,12 @@ typedef struct
318 int nrecords; /* no. of records in buffer */
319 size_t offset; /* output buffer fill level */
320 uint32_t seqno; /* sequence no. of next message */
321 +
322 + /* experimental */
323 + int cs_tid; /* template id of current dataset */
324 + int cs_bytes; /* size of current set */
325 + uint8_t *cs_header; /* start of current set */
326 +
327 } ipfix_t;
328
329 /** exporter funcs
330 --- a/lib/ipfix_col.c
331 +++ b/lib/ipfix_col.c
332 @@ -907,7 +907,7 @@ int ipfix_decode_datarecord( ipfixt_node
333 return 0;
334 }
335
336 -static void do_free_datarecord( ipfix_datarecord_t *data )
337 +void ipfix_free_datarecord( ipfix_datarecord_t *data )
338 {
339 if ( data ) {
340 if ( data->addrs )
341 @@ -925,6 +925,7 @@ int ipfix_parse_msg( ipfix_input_t *inpu
342 ipfix_hdr_t hdr; /* ipfix packet header */
343 ipfixs_node_t *s;
344 ipfix_datarecord_t data = { NULL, NULL, 0 };
345 + ipfixe_node_t *e;
346 uint8_t *buf; /* ipfix payload */
347 uint16_t setid, setlen; /* set id, set lenght */
348 int i, nread, offset; /* counter */
349 @@ -1042,6 +1043,12 @@ int ipfix_parse_msg( ipfix_input_t *inpu
350 err_flag = 1;
351 }
352 else {
353 + for ( e=g_exporter; e!=NULL; e=e->next ) {
354 + if ( e->elem->export_dset )
355 + (void) e->elem->export_dset( t, buf+nread, setlen,
356 + e->elem->data );
357 + }
358 +
359 /** read data records
360 */
361 for ( offset=nread, bytesleft=setlen; bytesleft>4; ) {
362 @@ -1076,11 +1083,11 @@ int ipfix_parse_msg( ipfix_input_t *inpu
363 goto errend;
364
365 end:
366 - do_free_datarecord( &data );
367 + ipfix_free_datarecord( &data );
368 return nread;
369
370 errend:
371 - do_free_datarecord( &data );
372 + ipfix_free_datarecord( &data );
373 return -1;
374 }
375
376 @@ -1093,7 +1100,7 @@ void process_client_tcp( int fd, int mas
377 tcp_conn_t *tcon = (tcp_conn_t*)data;
378 char *func = "process_client_tcp";
379
380 - mlogf( 3, "[%s] fd %d mask %d called.\n", func, fd, mask );
381 + mlogf( 4, "[%s] fd %d mask %d called.\n", func, fd, mask );
382
383 /** read ipfix header
384 */
385 --- a/lib/ipfix_col.h
386 +++ b/lib/ipfix_col.h
387 @@ -88,6 +88,7 @@ typedef struct ipfix_col_info
388 int (*export_newsource)(ipfixs_node_t*,void*);
389 int (*export_newmsg)(ipfixs_node_t*,ipfix_hdr_t*,void*);
390 int (*export_trecord)(ipfixs_node_t*,ipfixt_node_t*,void*);
391 + int (*export_dset)(ipfixt_node_t*,uint8_t*,size_t,void*);
392 int (*export_drecord)(ipfixs_node_t*,ipfixt_node_t*,
393 ipfix_datarecord_t*,void*);
394 void (*export_cleanup)(void*);
395 --- a/lib/ipfix_col_files.c
396 +++ b/lib/ipfix_col_files.c
397 @@ -68,7 +68,7 @@ static int export_newsource_file( ipfixs
398 return -1;
399 }
400 snprintf( s->fname+strlen(s->fname), PATH_MAX-strlen(s->fname),
401 - "/%u", s->odid );
402 + "/%u", (unsigned int)s->odid );
403 if ( (access( s->fname, R_OK ) <0 )
404 && (mkdir( s->fname, S_IRWXU ) <0) ) {
405 mlogf( 0, "[%s] cannot access dir '%s': %s\n",