source: branches/parallel/src/TableTraverse.cpp @ 2287

Last change on this file since 2287 was 2287, checked in by KohjiNakamura, 13 years ago

modernQSort bug fix

  • Property svn:keywords set to Id
File size: 11.5 KB
Line 
1//
2// C++ Interface: Scantable
3//
4// Description:
5//
6// Copyright: See COPYING file that comes with this distribution
7//
8
9#include <stdio.h>
10#include <stdlib.h>
11
12#include <TableTraverse.h>
13#include <tables/Tables/TableColumn.h>
14#include <tables/Tables/PlainColumn.h>
15#include <tables/Tables/DataManager.h>
16
17#define TIMING 0
18
19#if TIMING
20#include <sys/time.h>
21#endif
22
23static const char version[] = "$Id: TableTraverse.cpp 2287 2011-09-05 08:30:08Z KohjiNakamura $";
24
25using namespace casa;
26
27namespace asap {
28  class Comparator {
29  public:
30    virtual ~Comparator() {}
31    virtual int compare(const void *a, const void *b) = 0;
32  };
33
34  class CompContext: public Comparator {
35    uInt nCols;
36    void const *const *colValues;
37    const TypeManager *const* typeManagers;
38  public:
39    CompContext(uInt nCols, void const *const colValues[],
40                const TypeManager *const typeManagers[])
41      : nCols(nCols), colValues(colValues), typeManagers(typeManagers) {
42      DebugAssert(colValues != NULL, AipsError);
43      DebugAssert(typeManagers != NULL, AipsError);
44      for (uInt i = 0; i < nCols; i++) {
45        DebugAssert(colValues[i] != NULL, AipsError);
46        DebugAssert(typeManagers[i] != NULL, AipsError);
47      }
48    }
49
50    virtual int compare(const void *a, const void *b) {
51      uInt ai = *(uInt*)a;
52      uInt bi = *(uInt*)b;
53      for (size_t i = 0; i < nCols; i++) {
54        const size_t valueSize = typeManagers[i]->sizeOf();
55        const char *values = (const char *)colValues[i];
56        int diff = typeManagers[i]->getComparator()->comp(&values[ai*valueSize], &values[bi*valueSize]);
57        if (diff != 0) {
58          return diff;
59        }
60      }
61      return 0;
62    }
63  };
64}
65
66#if TIMING
67static double currenttime()
68{
69  struct timeval tv;
70  int result = gettimeofday(&tv, NULL);
71  return tv.tv_sec + ((double)tv.tv_usec) / 1000000.;
72}
73#endif
74
75#ifdef HAVE_QSORT_R
76
77static int compWithComparator(void *arg, const void *a, const void *b)
78{
79        return ((asap::Comparator *)arg)->compare(a, b);
80}
81
82#else
83
84static inline void swap_(char *a, char *b, size_t size)
85{
86  char tmp[size];
87  char *p = tmp;
88  do {
89    *p = *a;
90    *a++ = *b;
91    *b++ = *p++;
92  } while (--size > 0);
93}
94
95static inline void cpy(char *dest, char *src, size_t size)
96{
97  for (size_t i = 0; i < size; i++) {
98    dest[i] = src[i];
99  }
100}
101
102// returning NULL means, already sorted.
103static inline char *quickFindPivot(char *const left, char *const right,
104                                   const size_t size, asap::Comparator *comp)
105{
106  char *const m = &left[(((right - left) / size) / 2 ) * size];
107  char *result = NULL;
108  if (left == m) { // This means there are at most 2 elements.
109    int diff = (left == right) ? 0 : comp->compare(left, right);
110    if (diff > 0) {
111      swap_(left, right, size);
112    }
113    if (diff != 0) {
114      return right;
115    }
116    return result;
117  }
118
119  int diff = comp->compare(left, m);
120  if (diff > 0) {
121    swap_(left, m, size);
122  }
123  int diff2 = comp->compare(m, right);
124  if (diff == 0 && diff2 == 0) {
125    return result;
126  }
127  result = m;
128  if (diff2 > 0) {
129    swap_(right, m, size);
130    int diff3 = comp->compare(left, m);
131    if (diff3 > 0) {
132      swap_(left, m, size);
133    } else if (diff3 == 0) {
134      result = right;
135    }
136  } else if (diff == 0) {
137    result = right;
138  }
139  return result;
140}
141
142static inline char *findPivot(char *const start, char *const end,
143                              size_t size, asap::Comparator *comp)
144{
145  char *result = quickFindPivot(start, end, size, comp);
146  if (result || &start[2*size] >= end) {
147    return result;
148  }
149
150  for (char *pivot = start + size; pivot <= end; pivot += size) {
151    int diff = comp->compare(start, pivot);
152    if (diff < 0) {
153      return pivot;
154    } else if (diff > 0) {
155      swap_(start, pivot, size);
156      return pivot;
157    }
158  }
159  return NULL; // all values are same.
160}
161
162static void sort_(char *const left, char *const right, const size_t size,
163                  asap::Comparator *comp, int level)
164{
165  char *const m = findPivot(left, right, size, comp);
166  if (m == NULL) {
167    return;
168  }
169  char *l = left;
170  char *r = right;
171  char pivot[size]  __attribute__ ((aligned (16)));
172  cpy(pivot, m, size);
173  while (true) {
174    while (l < r && comp->compare(l, pivot) < 0) {
175      l += size;
176    }
177    while (l < r && comp->compare(pivot, r) <= 0) {
178      r -= size;
179    }
180    if (l >= r) {
181      break;
182    }
183    swap_(l, r, size);
184    l += size;
185    r -= size;
186  }
187  if (l == r && comp->compare(l, pivot) < 0) {
188    l += size;
189  }
190  sort_(left, l - size, size, comp, level + 1);
191  sort_(l, right, size, comp, level + 1);
192}
193
194#endif
195
196static inline void modernQSort(void *base, size_t elements, size_t size,
197                               asap::Comparator *comp)
198{
199  if (elements > 1) {
200#ifdef HAVE_QSORT_R
201    qsort_r(base, elements, size, comp, compWithComparator);
202#else
203    sort_((char *)base, &((char *)base)[(elements - 1) * size], size, comp, 0);
204#endif
205  }
206}
207
208namespace asap {
209  class ROTableColumnBackDoor: public ROTableColumn {
210  public:
211    static BaseColumn *baseColumnPtr(const ROTableColumn *col)
212    {
213      return ((ROTableColumnBackDoor*)col)->baseColPtr();
214    }
215  };
216
217  static void copyColumnData(void *colValues, size_t elementSize, uInt nRows,
218                             BaseColumn *refCol)
219  {
220    char *base = (char *)colValues;
221    for (uInt i = 0; i < nRows; i++) {
222      refCol->get(i, &base[elementSize * i]);
223    }
224  }
225
226  void traverseTable(const Table &table,
227                     const char *const columnNames[],
228                     const TypeManager *const typeManagers[],
229                     TableVisitor *visitor,
230                     Bool doSort)
231  {
232#if TIMING
233    double s = currenttime();
234#endif
235    uInt colCount = 0;
236    for (; columnNames[colCount]; colCount++) {
237      AlwaysAssert(typeManagers[colCount], AipsError);
238    }
239
240    ROTableColumn *cols[colCount];
241    void *colValues[colCount];
242    for (uInt i = 0; i < colCount; i++) {
243      cols[i] = NULL;
244      colValues[i] = NULL;
245    }
246    size_t sizes[colCount];
247    const uInt nRows = table.nrow();
248    for (uInt i = 0; i < colCount; i++) {
249      cols[i] = new ROTableColumn(table, columnNames[i]);
250      colValues[i] = typeManagers[i]->allocArray(nRows);
251      sizes[i] = typeManagers[i]->sizeOf();
252      BaseColumn *baseCol = ROTableColumnBackDoor::baseColumnPtr(cols[i]);
253      PlainColumn *col = dynamic_cast <PlainColumn *>(baseCol);
254      if (col) {
255        const uInt gotRows = col->dataManagerColumn()->
256          getBlockV(0, nRows, colValues[i]);
257        DebugAssert(gotRows == nRows, AipsError);
258      } else {
259        copyColumnData(colValues[i], typeManagers[i]->sizeOf(), nRows, baseCol);
260      }
261    }
262
263    uInt *idx = new uInt[nRows];
264    for (uInt i = 0; i < nRows; i++) {
265      idx[i] = i;
266    }
267
268    try {
269      if (doSort) {
270        CompContext compCtx(colCount, colValues, typeManagers);
271        modernQSort(idx, nRows, sizeof(idx[0]), &compCtx);
272      }
273
274      visitor->start();
275      Bool first = True;
276      for (uInt i = 0; i < nRows; i++) {
277        if (visitor->visit(first, idx[i], colCount, colValues) == False) {
278          break;
279        }
280        first = False;
281      }
282      visitor->finish();
283    } catch (...) {
284      delete[] idx;
285      for (uInt i = 0; i < colCount; i++) {
286        typeManagers[i]->freeArray(colValues[i]);
287        delete cols[i];
288      }
289      throw;
290    }
291
292    delete[] idx;
293    for (uInt i = 0; i < colCount; i++) {
294      typeManagers[i]->freeArray(colValues[i]);
295      delete cols[i];
296    }
297
298#if TIMING
299    double e = currenttime();
300    printf("%s took %.3f sec\n", __PRETTY_FUNCTION__, e - s);
301#endif
302  }
303}
304
305#if 0 && TIMING
306#include <assert.h>
307
308#define elementsof(x) (sizeof(x)/sizeof(*x))
309
310using namespace asap ;
311
312class IntComp: public Comparator {
313public:
314  virtual int compare(const void *a, const void *b) {
315    return (*(int*)a) - (*(int*)b);
316  }
317};
318
319static int compare(const void *a, const void *b) {
320  return (*(int*)a) - (*(int*)b);
321}
322
323int main()
324{
325  IntComp myComp;
326  srand((int)currenttime());
327  const size_t N = 1024*1024*100;
328  int *x = new int[N];
329  int xx[] = {
330    5, 3,0,  1, 2,4, 4, 4
331  };
332#if 0
333  {
334    int x[] = {1};
335    char *p = findPivot((char *)x, (char *)&x[0], sizeof(x[0]), &myComp);
336    assert(p == NULL);
337  }
338  {
339    int x[] = {1, 1};
340    char *p = findPivot((char *)x, (char *)&x[1], sizeof(x[0]), &myComp);
341    assert(p == NULL);
342  }
343  {
344    int x[] = {2, 1};
345    char *p = findPivot((char *)x, (char *)&x[1], sizeof(x[0]), &myComp);
346    assert((int*)p == &x[1] && x[0] == 1 && x[1] == 2);
347  }
348  {
349    int x[] = {1, 2};
350    char *p = findPivot((char *)x, (char *)&x[1], sizeof(x[0]), &myComp);
351    assert((int*)p == &x[1] && x[0] == 1 && x[1] == 2);
352  }
353  {
354    int x[] = {1, 1, 1};
355    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
356    assert(p == NULL);
357  }
358  {
359    int x[] = {2, 1, 1};
360    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
361    assert((int*)p == &x[2]);
362    assert(x[0] == 1 && x[1] == 1 && x[2] == 2);
363  }
364  {
365    int x[] = {1, 2, 1};
366    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
367    assert((int*)p == &x[2]);
368    assert(x[0] == 1 && x[1] == 1 && x[2] == 2);
369  }
370  {
371    int x[] = {1, 1, 2};
372    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
373    assert((int*)p == &x[2]);
374    assert(x[0] == 1 && x[1] == 1 && x[2] == 2);
375  }
376  {
377    int x[] = {2, 2, 1};
378    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
379    assert((int*)p == &x[1]);
380    assert(x[0] == 1 && x[1] == 2 && x[2] == 2);
381  }
382  {
383    int x[] = {2, 1, 2};
384    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
385    assert((int*)p == &x[1]);
386    assert(x[0] == 1 && x[1] == 2 && x[2] == 2);
387  }
388  {
389    int x[] = {1, 2, 2};
390    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
391    assert((int*)p == &x[1]);
392    assert(x[0] == 1 && x[1] == 2 && x[2] == 2);
393  }
394  {
395    int x[] = {1, 2, 3};
396    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
397    assert((int*)p == &x[1]);
398    assert(x[0] == 1 && x[1] == 2 && x[2] == 3);
399  }
400  {
401    int x[] = {2, 1, 3};
402    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
403    assert((int*)p == &x[1]);
404    assert(x[0] == 1 && x[1] == 2 && x[2] == 3);
405  }
406  {
407    int x[] = {1, 3, 2};
408    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
409    assert((int*)p == &x[1]);
410    assert(x[0] == 1 && x[1] == 2 && x[2] == 3);
411  }
412  {
413    int x[] = {3, 1, 2};
414    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
415    assert((int*)p == &x[1]);
416    assert(x[0] == 1 && x[1] == 2 && x[2] == 3);
417  }
418  {
419    int x[] = {3, 2, 1};
420    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
421    assert((int*)p == &x[1]);
422    assert(x[0] == 1 && x[1] == 2 && x[2] == 3);
423  }
424  {
425    int x[] = {2, 3, 1};
426    char *p = findPivot((char *)x, (char *)&x[2], sizeof(x[0]), &myComp);
427    assert((int*)p == &x[1]);
428    assert(x[0] == 1 && x[1] == 2 && x[2] == 3);
429  }
430  {
431    int x[] = {36, 39, 42, 41, 36};
432    char *p = findPivot((char *)x, (char *)&x[4], sizeof(x[0]), &myComp);
433    assert((int*)p == &x[4]);
434    assert(x[4] == 42);
435  }
436
437  //assert(("assert enabled: OK", false));
438  modernQSort(x, 8, sizeof(x[0]), &myComp);
439  for (int i = 0; i < 8; i++) {
440    fprintf(stderr, "%d\n", x[i]);
441  }
442#endif
443  fprintf(stderr, "init\n");
444  for (int i = 0; i < N; i++) {
445    x[i] = rand();
446  }
447  fprintf(stderr, "sorting\n");
448  modernQSort(x, 1, sizeof(x[0]), &myComp);
449  modernQSort(x, 2, sizeof(x[0]), &myComp);
450  modernQSort(x, 3, sizeof(x[0]), &myComp);
451  if (! getenv("QSORT")) {
452    double _s = currenttime();
453    modernQSort(x, N, sizeof(x[0]), &myComp);
454    double _e = currenttime();
455    fprintf(stderr, "modernSorted %.3f\n", _e - _s);
456  } else {
457    double _s = currenttime();
458    qsort(x, N, sizeof(x[0]), compare);
459    double _e = currenttime();
460    fprintf(stderr, "qsorted %.3f\n", _e - _s);
461  }
462  fprintf(stderr, "outputting\n");
463  for (int i = 0; i < N; i++) {
464    printf("%d\n", x[i]);
465  }
466  delete[] x;
467}
468#endif
Note: See TracBrowser for help on using the repository browser.