libdap  Updated for version 3.17.2
XDRStreamMarshaller.cc
1 // XDRStreamMarshaller.cc
2 
3 // -*- mode: c++; c-basic-offset:4 -*-
4 
5 // This file is part of libdap, A C++ implementation of the OPeNDAP Data
6 // Access Protocol.
7 
8 // Copyright (c) 2002,2003 OPeNDAP, Inc.
9 // Author: Patrick West <pwest@ucar.edu>
10 //
11 // This library is free software; you can redistribute it and/or
12 // modify it under the terms of the GNU Lesser General Public
13 // License as published by the Free Software Foundation; either
14 // version 2.1 of the License, or (at your option) any later version.
15 //
16 // This library is distributed in the hope that it will be useful,
17 // but WITHOUT ANY WARRANTY; without even the implied warranty of
18 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 // Lesser General Public License for more details.
20 //
21 // You should have received a copy of the GNU Lesser General Public
22 // License along with this library; if not, write to the Free Software
23 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 //
25 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
26 
27 // (c) COPYRIGHT URI/MIT 1994-1999
28 // Please read the full copyright statement in the file COPYRIGHT_URI.
29 //
30 // Authors:
31 // pwest Patrick West <pwest@ucar.edu>
32 
33 #include "config.h"
34 
35 #ifdef HAVE_PTHREAD_H
36 #include <pthread.h>
37 #endif
38 
39 #include <cassert>
40 
41 #include <iostream>
42 #include <sstream>
43 #include <iomanip>
44 
45 // #define DODS_DEBUG
46 
47 #include "XDRStreamMarshaller.h"
48 #ifdef USE_POSIX_THREADS
49 #include "MarshallerThread.h"
50 #endif
51 #include "Vector.h"
52 #include "XDRUtils.h"
53 #include "util.h"
54 
55 #include "debug.h"
56 
57 using namespace std;
58 
59 // Build this code so it does not use pthreads to write some kinds of
60 // data (see the put_vector() and put_vector_part() methods) in a child thread.
61 // #undef USE_POSIX_THREADS
62 
63 namespace libdap {
64 
65 char *XDRStreamMarshaller::d_buf = 0;
66 static const int XDR_DAP_BUFF_SIZE=256;
67 
68 
77 XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) :
78  d_out(out), d_partial_put_byte_count(0), tm(0)
79 {
80  if (!d_buf) d_buf = (char *) malloc(XDR_DAP_BUFF_SIZE);
81  if (!d_buf) throw Error(internal_error, "Failed to allocate memory for data serialization.");
82 
83  xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE);
84 
85 #ifdef USE_POSIX_THREADS
86  tm = new MarshallerThread;
87 #endif
88 }
89 
90 XDRStreamMarshaller::~XDRStreamMarshaller()
91 {
92  delete tm;
93 
94  xdr_destroy(&d_sink);
95 }
96 
97 void XDRStreamMarshaller::put_byte(dods_byte val)
98 {
99  if (!xdr_setpos(&d_sink, 0))
100  throw Error("Network I/O Error. Could not send byte data - unable to set stream position.");
101 
102  if (!xdr_char(&d_sink, (char *) &val))
103  throw Error(
104  "Network I/O Error. Could not send byte data.");
105 
106  unsigned int bytes_written = xdr_getpos(&d_sink);
107  if (!bytes_written)
108  throw Error(
109  "Network I/O Error. Could not send byte data - unable to get stream position.");
110 
111 #ifdef USE_POSIX_THREADS
112  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
113 #endif
114 
115  d_out.write(d_buf, bytes_written);
116 }
117 
118 void XDRStreamMarshaller::put_int16(dods_int16 val)
119 {
120  if (!xdr_setpos(&d_sink, 0))
121  throw Error(
122  "Network I/O Error. Could not send int 16 data - unable to set stream position.");
123 
124  if (!XDR_INT16(&d_sink, &val))
125  throw Error(
126  "Network I/O Error. Could not send int 16 data.");
127 
128  unsigned int bytes_written = xdr_getpos(&d_sink);
129  if (!bytes_written)
130  throw Error(
131  "Network I/O Error. Could not send int 16 data - unable to get stream position.");
132 
133 #ifdef USE_POSIX_THREADS
134  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
135 #endif
136 
137  d_out.write(d_buf, bytes_written);
138 }
139 
140 void XDRStreamMarshaller::put_int32(dods_int32 val)
141 {
142  if (!xdr_setpos(&d_sink, 0))
143  throw Error(
144  "Network I/O Error. Could not send int 32 data - unable to set stream position.");
145 
146  if (!XDR_INT32(&d_sink, &val))
147  throw Error(
148  "Network I/O Error. Culd not read int 32 data.");
149 
150  unsigned int bytes_written = xdr_getpos(&d_sink);
151  if (!bytes_written)
152  throw Error(
153  "Network I/O Error. Could not send int 32 data - unable to get stream position.");
154 
155 #ifdef USE_POSIX_THREADS
156  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
157 #endif
158 
159  d_out.write(d_buf, bytes_written);
160 }
161 
162 void XDRStreamMarshaller::put_float32(dods_float32 val)
163 {
164  if (!xdr_setpos(&d_sink, 0))
165  throw Error(
166  "Network I/O Error. Could not send float 32 data - unable to set stream position.");
167 
168  if (!xdr_float(&d_sink, &val))
169  throw Error(
170  "Network I/O Error. Could not send float 32 data.");
171 
172  unsigned int bytes_written = xdr_getpos(&d_sink);
173  if (!bytes_written)
174  throw Error(
175  "Network I/O Error. Could not send float 32 data - unable to get stream position.");
176 
177 #ifdef USE_POSIX_THREADS
178  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
179 #endif
180 
181  d_out.write(d_buf, bytes_written);
182 }
183 
184 void XDRStreamMarshaller::put_float64(dods_float64 val)
185 {
186  if (!xdr_setpos(&d_sink, 0))
187  throw Error(
188  "Network I/O Error. Could not send float 64 data - unable to set stream position.");
189 
190  if (!xdr_double(&d_sink, &val))
191  throw Error(
192  "Network I/O Error. Could not send float 64 data.");
193 
194  unsigned int bytes_written = xdr_getpos(&d_sink);
195  if (!bytes_written)
196  throw Error(
197  "Network I/O Error. Could not send float 64 data - unable to get stream position.");
198 
199 #ifdef USE_POSIX_THREADS
200  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
201 #endif
202 
203  d_out.write(d_buf, bytes_written);
204 }
205 
206 void XDRStreamMarshaller::put_uint16(dods_uint16 val)
207 {
208  if (!xdr_setpos(&d_sink, 0))
209  throw Error(
210  "Network I/O Error. Could not send uint 16 data - unable to set stream position.");
211 
212  if (!XDR_UINT16(&d_sink, &val))
213  throw Error(
214  "Network I/O Error. Could not send uint 16 data.");
215 
216  unsigned int bytes_written = xdr_getpos(&d_sink);
217  if (!bytes_written)
218  throw Error(
219  "Network I/O Error. Could not send uint 16 data - unable to get stream position.");
220 
221 #ifdef USE_POSIX_THREADS
222  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
223 #endif
224 
225  d_out.write(d_buf, bytes_written);
226 }
227 
228 void XDRStreamMarshaller::put_uint32(dods_uint32 val)
229 {
230  if (!xdr_setpos(&d_sink, 0))
231  throw Error(
232  "Network I/O Error. Could not send uint 32 data - unable to set stream position.");
233 
234  if (!XDR_UINT32(&d_sink, &val))
235  throw Error(
236  "Network I/O Error. Could not send uint 32 data.");
237 
238  unsigned int bytes_written = xdr_getpos(&d_sink);
239  if (!bytes_written)
240  throw Error(
241  "Network I/O Error. Could not send uint 32 data - unable to get stream position.");
242 
243 #ifdef USE_POSIX_THREADS
244  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
245 #endif
246 
247  d_out.write(d_buf, bytes_written);
248 }
249 
250 void XDRStreamMarshaller::put_str(const string &val)
251 {
252  int size = val.length() + 8;
253 
254  XDR str_sink;
255  vector<char> str_buf(size);
256 
257  try {
258  xdrmem_create(&str_sink, &str_buf[0], size, XDR_ENCODE);
259 
260  if (!xdr_setpos(&str_sink, 0))
261  throw Error(
262  "Network I/O Error. Could not send string data - unable to set stream position.");
263 
264  const char *out_tmp = val.c_str();
265  if (!xdr_string(&str_sink, (char **) &out_tmp, size))
266  throw Error(
267  "Network I/O Error. Could not send string data.");
268 
269  unsigned int bytes_written = xdr_getpos(&str_sink);
270  if (!bytes_written)
271  throw Error(
272  "Network I/O Error. Could not send string data - unable to get stream position.");
273 
274 #ifdef USE_POSIX_THREADS
275  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
276 #endif
277 
278  d_out.write(&str_buf[0], bytes_written);
279 
280  xdr_destroy(&str_sink);
281  }
282  catch (...) {
283  xdr_destroy(&str_sink);
284  throw;
285  }
286 }
287 
288 void XDRStreamMarshaller::put_url(const string &val)
289 {
290  put_str(val);
291 }
292 
293 void XDRStreamMarshaller::put_opaque(char *val, unsigned int len)
294 {
295  if (len > XDR_DAP_BUFF_SIZE)
296  throw Error("Network I/O Error. Could not send opaque data - length of opaque data larger than allowed");
297 
298  if (!xdr_setpos(&d_sink, 0))
299  throw Error(
300  "Network I/O Error. Could not send opaque data - unable to set stream position.");
301 
302  if (!xdr_opaque(&d_sink, val, len))
303  throw Error(
304  "Network I/O Error. Could not send opaque data.");
305 
306  unsigned int bytes_written = xdr_getpos(&d_sink);
307  if (!bytes_written)
308  throw Error(
309  "Network I/O Error. Could not send opaque data - unable to get stream position.");
310 
311 #ifdef USE_POSIX_THREADS
312  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
313 #endif
314 
315  d_out.write(d_buf, bytes_written);
316 }
317 
318 void XDRStreamMarshaller::put_int(int val)
319 {
320  if (!xdr_setpos(&d_sink, 0))
321  throw Error(
322  "Network I/O Error. Could not send int data - unable to set stream position.");
323 
324  if (!xdr_int(&d_sink, &val))
325  throw Error(
326  "Network I/O Error(1). Could not send int data.");
327 
328  unsigned int bytes_written = xdr_getpos(&d_sink);
329  if (!bytes_written)
330  throw Error(
331  "Network I/O Error. Could not send int data - unable to get stream position.");
332 
333 #ifdef USE_POSIX_THREADS
334  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
335 #endif
336 
337  d_out.write(d_buf, bytes_written);
338 }
339 
340 void XDRStreamMarshaller::put_vector(char *val, int num, int width, Vector &vec)
341 {
342  put_vector(val, num, width, vec.var()->type());
343 }
344 
345 
354 {
355  put_int(num);
356  put_int(num);
357 
358  d_partial_put_byte_count = 0;
359 }
360 
368 {
369 #ifdef USE_POSIX_THREADS
370  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
371 #endif
372 
373  // Compute the trailing (padding) bytes
374 
375  // Note that the XDR standard pads values to 4 byte boundaries.
376  //unsigned int pad = (d_partial_put_byte_count % 4) == 0 ? 0: 4 - (d_partial_put_byte_count % 4);
377  unsigned int mod_4 = d_partial_put_byte_count & 0x03;
378  unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4;
379 
380  if (pad) {
381  vector<char> padding(4, 0); // 4 zeros
382 
383  d_out.write(&padding[0], pad);
384  if (d_out.fail()) throw Error("Network I/O Error. Could not send vector data padding");
385  }
386 }
387 
388 // Start of parallel I/O support. jhrg 8/19/15
389 void XDRStreamMarshaller::put_vector(char *val, int num, Vector &)
390 {
391  if (!val) throw InternalErr(__FILE__, __LINE__, "Could not send byte vector data. Buffer pointer is not set.");
392 
393  // write the number of members of the array being written and then set the position to 0
394  put_int(num);
395 
396  // this is the word boundary for writing xdr bytes in a vector.
397  const unsigned int add_to = 8;
398  // switch to memory on the heap since the thread will need to access it
399  // after this code returns.
400  char *byte_buf = new char[num + add_to];
401  XDR byte_sink;
402  try {
403  xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE);
404  if (!xdr_setpos(&byte_sink, 0))
405  throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
406 
407  if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, num + add_to))
408  throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
409 
410  unsigned int bytes_written = xdr_getpos(&byte_sink);
411  if (!bytes_written)
412  throw Error("Network I/O Error. Could not send byte vector data - unable to get stream position.");
413 
414 #ifdef USE_POSIX_THREADS
415  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
416  tm->increment_child_thread_count();
417  tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, bytes_written);
418  xdr_destroy(&byte_sink);
419 #else
420  d_out.write(byte_buf, bytes_written);
421  xdr_destroy(&byte_sink);
422  delete [] byte_buf;
423 #endif
424 
425  }
426  catch (...) {
427  DBG(cerr << "Caught an exception in put_vector_thread" << endl);
428  xdr_destroy(&byte_sink);
429  delete [] byte_buf;
430  throw;
431  }
432 }
433 
434 // private
445 void XDRStreamMarshaller::put_vector(char *val, unsigned int num, int width, Type type)
446 {
447  assert(val || num == 0);
448 
449  // write the number of array members being written, then set the position back to 0
450  put_int(num);
451 
452  if (num == 0)
453  return;
454 
455  int use_width = width;
456  if (use_width < 4) use_width = 4;
457 
458  // the size is the number of elements num times the width of each
459  // element, then add 4 bytes for the number of elements
460  int size = (num * use_width) + 4;
461 
462  // allocate enough memory for the elements
463  //vector<char> vec_buf(size);
464  char *vec_buf = new char[size];
465  XDR vec_sink;
466  try {
467  xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
468 
469  // set the position of the sink to 0, we're starting at the beginning
470  if (!xdr_setpos(&vec_sink, 0))
471  throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
472 
473  // write the array to the buffer
474  if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
475  throw Error("Network I/O Error(2). Could not send vector data - unable to encode.");
476 
477  // how much was written to the buffer
478  unsigned int bytes_written = xdr_getpos(&vec_sink);
479  if (!bytes_written)
480  throw Error("Network I/O Error. Could not send vector data - unable to get stream position.");
481 
482 #ifdef USE_POSIX_THREADS
483  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
484  tm->increment_child_thread_count();
485  tm->start_thread(MarshallerThread::write_thread, d_out, vec_buf, bytes_written);
486  xdr_destroy(&vec_sink);
487 #else
488  d_out.write(vec_buf, bytes_written);
489  xdr_destroy(&vec_sink);
490  delete [] vec_buf;
491 #endif
492  }
493  catch (...) {
494  xdr_destroy(&vec_sink);
495  delete [] vec_buf;
496  throw;
497  }
498 }
499 
511 void XDRStreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type)
512 {
513  if (width == 1) {
514  // Add space for the 4 bytes of length info and 4 bytes for padding, even though
515  // we will not send either of those.
516  const unsigned int add_to = 8;
517  unsigned int bufsiz = num + add_to;
518  //vector<char> byte_buf(bufsiz);
519  char *byte_buf = new char[bufsiz];
520  XDR byte_sink;
521  try {
522  xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE);
523  if (!xdr_setpos(&byte_sink, 0))
524  throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
525 
526  if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, bufsiz))
527  throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
528 
529 #ifdef USE_POSIX_THREADS
530  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
531  tm->increment_child_thread_count();
532 
533  // Increment the element count so we can figure out about the padding in put_vector_last()
534  d_partial_put_byte_count += num;
535 
536  tm->start_thread(MarshallerThread::write_thread_part, d_out, byte_buf, num);
537  xdr_destroy(&byte_sink);
538 #else
539  // Only send the num bytes that follow the 4 bytes of length info - we skip the
540  // length info because it's already been sent and we don't send any trailing padding
541  // bytes in this method (see put_vector_last() for that).
542  d_out.write(byte_buf + 4, num);
543 
544  if (d_out.fail())
545  throw Error ("Network I/O Error. Could not send initial part of byte vector data");
546 
547  // Now increment the element count so we can figure out about the padding in put_vector_last()
548  d_partial_put_byte_count += num;
549 
550  xdr_destroy(&byte_sink);
551  delete [] byte_buf;
552 #endif
553  }
554  catch (...) {
555  xdr_destroy(&byte_sink);
556  delete [] byte_buf;
557  throw;
558  }
559  }
560  else {
561  int use_width = (width < 4) ? 4 : width;
562 
563  // the size is the number of elements num times the width of each
564  // element, then add 4 bytes for the (int) number of elements
565  int size = (num * use_width) + 4;
566 
567  // allocate enough memory for the elements
568  //vector<char> vec_buf(size);
569  char *vec_buf = new char[size];
570  XDR vec_sink;
571  try {
572  xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
573 
574  // set the position of the sink to 0, we're starting at the beginning
575  if (!xdr_setpos(&vec_sink, 0))
576  throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
577 
578  // write the array to the buffer
579  if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
580  throw Error("Network I/O Error(2). Could not send vector data -unable to encode data.");
581 
582 #ifdef USE_POSIX_THREADS
583  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
584  tm->increment_child_thread_count();
585 
586  // Increment the element count so we can figure out about the padding in put_vector_last()
587  d_partial_put_byte_count += (size - 4);
588  tm->start_thread(MarshallerThread::write_thread_part, d_out, vec_buf, size - 4);
589  xdr_destroy(&vec_sink);
590 #else
591  // write that much out to the output stream, skipping the length data that
592  // XDR writes since we have already written the length info using put_vector_start()
593  d_out.write(vec_buf + 4, size - 4);
594 
595  if (d_out.fail())
596  throw Error ("Network I/O Error. Could not send part of vector data");
597 
598  // Now increment the element count so we can figure out about the padding in put_vector_last()
599  d_partial_put_byte_count += (size - 4);
600 
601  xdr_destroy(&vec_sink);
602  delete [] vec_buf;
603 #endif
604  }
605  catch (...) {
606  xdr_destroy(&vec_sink);
607  delete [] vec_buf;
608  throw;
609  }
610  }
611 }
612 
613 void XDRStreamMarshaller::dump(ostream &strm) const
614 {
615  strm << DapIndent::LMarg << "XDRStreamMarshaller::dump - (" << (void *) this << ")" << endl;
616 }
617 
618 } // namespace libdap
619 
virtual void dump(ostream &strm) const
dump the contents of this object to the specified ostream
static void * write_thread(void *arg)
Holds a one-dimensional collection of DAP2 data types.
Definition: Vector.h:80
virtual void put_vector_part(char *val, unsigned int num, int width, Type type)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
STL namespace.
virtual void put_vector_start(int num)
Type
Identifies the data type.
Definition: Type.h:94
virtual Type type() const
Returns the type of the class instance.
Definition: BaseType.cc:310
A class for software fault reporting.
Definition: InternalErr.h:64
virtual BaseType * var(const string &name="", bool exact_match=true, btp_stack *s=0)
Definition: Vector.cc:392
static void * write_thread_part(void *arg)
static xdrproc_t xdr_coder(const Type &t)
Returns a function used to encode elements of an array.
Definition: XDRUtils.cc:145
A class for error processing.
Definition: Error.h:90