13 #ifndef STXXL_ASYNC_SCHEDULE_HEADER
14 #define STXXL_ASYNC_SCHEDULE_HEADER
20 #ifdef STXXL_BOOST_CONFIG
21 #include <boost/config.hpp>
23 #include <stxxl/bits/io/iobase.h>
25 #include <stxxl/bits/compat_hash_map.h>
26 #include <stxxl/bits/compat_hash_set.h>
29 __STXXL_BEGIN_NAMESPACE
35 inline sim_event(int_type t, int_type b) : timestamp(t), iblock(b) { }
38 struct sim_event_cmp :
public std::binary_function<sim_event, sim_event, bool>
40 inline bool operator () (
const sim_event & a,
const sim_event & b)
const
42 return a.timestamp > b.timestamp;
46 int_type simulate_async_write(
49 const int_type m_init,
51 std::pair<int_type, int_type> * o_time);
53 typedef std::pair<int_type, int_type> write_time_pair;
54 struct write_time_cmp :
public std::binary_function<write_time_pair, write_time_pair, bool>
56 inline bool operator () (
const write_time_pair & a,
const write_time_pair & b)
const
58 return a.second > b.second;
62 void compute_prefetch_schedule(
69 template <
typename run_type>
70 void simulate_async_write(
71 const run_type & input,
72 const int_type m_init,
74 std::pair<int_type, int_type> * o_time)
76 typedef std::priority_queue<sim_event, std::vector<sim_event>, sim_event_cmp> event_queue_type;
77 typedef std::queue<int_type> disk_queue_type;
79 const int_type L = input.size();
81 disk_queue_type *
disk_queues =
new disk_queue_type[L];
82 event_queue_type event_queue;
87 bool * disk_busy =
new bool[D];
91 int_type disk = input[i].bid.storage->get_id();
92 disk_queues[disk].push(i);
97 for (int_type ii = 0; ii < D; ii++)
98 if (!disk_queues[ii].empty())
100 int_type j = disk_queues[ii].front();
101 disk_queues[ii].pop();
102 event_queue.push(sim_event(1, j));
105 while (!event_queue.empty())
107 sim_event cur = event_queue.top();
109 if (oldtime != cur.timestamp)
112 for (int_type i = 0; i < D; i++)
113 disk_busy[i] =
false;
116 oldtime = cur.timestamp;
118 o_time[cur.iblock] = std::pair<int_type, int_type>(cur.iblock, cur.timestamp + 1);
124 int_type disk = input[i].bid.storage->get_id();
127 disk_queues[disk].push(i);
131 event_queue.push(sim_event(cur.timestamp + 1, i));
132 disk_busy[disk] =
true;
139 int_type disk = input[cur.iblock].bid.storage->get_id();
140 if (!disk_busy[disk] && !disk_queues[disk].empty())
142 event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front()));
143 disk_queues[disk].pop();
144 disk_busy[disk] =
true;
149 delete[] disk_queues;
153 template <
typename run_type>
154 void compute_prefetch_schedule(
155 const run_type & input,
156 int_type * out_first,
160 typedef std::pair<int_type, int_type> pair_type;
161 const int_type L = input.size();
164 for (int_type i = 0; i < L; ++i)
169 pair_type * write_order =
new pair_type[L];
171 simulate_async_write(input, m, D, write_order);
173 std::stable_sort(write_order, write_order + L, write_time_cmp());
175 for (int_type i = 0; i < L; i++)
176 out_first[i] = write_order[i].first;
179 delete[] write_order;
183 template <
typename b
id_iterator_type>
184 void simulate_async_write(
185 bid_iterator_type input,
187 const int_type m_init,
189 std::pair<int_type, int_type> * o_time)
191 typedef std::priority_queue<sim_event, std::vector<sim_event>, sim_event_cmp> event_queue_type;
192 typedef std::queue<int_type> disk_queue_type;
195 typedef typename compat_hash_map<int, disk_queue_type>::result disk_queues_type;
196 disk_queues_type disk_queues;
197 event_queue_type event_queue;
201 int_type oldtime = 0;
203 compat_hash_set<int>::result disk_busy;
205 while (m && (i >= 0))
207 int_type disk = (*(input + i)).storage->get_id();
208 disk_queues[disk].push(i);
222 disk_queues_type::iterator it = disk_queues.begin();
223 for ( ; it != disk_queues.end(); ++it)
225 int_type j = (*it).second.front();
227 event_queue.push(sim_event(1, j));
230 while (!event_queue.empty())
232 sim_event cur = event_queue.top();
234 if (oldtime != cur.timestamp)
241 oldtime = cur.timestamp;
243 o_time[cur.iblock] = std::pair<int_type, int_type>(cur.iblock, cur.timestamp + 1);
249 int_type disk = (*(input + i)).storage->get_id();
250 if (disk_busy.find(disk) != disk_busy.end())
252 disk_queues[disk].push(i);
256 event_queue.push(sim_event(cur.timestamp + 1, i));
257 disk_busy.insert(disk);
264 int_type disk = (*(input + cur.iblock)).storage->get_id();
265 if (disk_busy.find(disk) == disk_busy.end() && !disk_queues[disk].empty())
267 event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front()));
268 disk_queues[disk].pop();
269 disk_busy.insert(disk);
278 template <
typename b
id_iterator_type>
279 void compute_prefetch_schedule(
280 bid_iterator_type input_begin,
281 bid_iterator_type input_end,
282 int_type * out_first,
286 typedef std::pair<int_type, int_type> pair_type;
287 const int_type L = input_end - input_begin;
288 STXXL_VERBOSE1(
"compute_prefetch_schedule: sequence length=" << L <<
" disks=" << D);
291 for (int_type i = 0; i < L; ++i)
296 pair_type * write_order =
new pair_type[L];
298 simulate_async_write(input_begin, L, m, D, write_order);
300 std::stable_sort(write_order, write_order + L, write_time_cmp());
302 STXXL_VERBOSE1(
"Computed prefetch order for " << L <<
" blocks with " <<
303 D <<
" disks and " << m <<
" blocks");
304 for (int_type i = 0; i < L; i++)
306 out_first[i] = write_order[i].first;
307 STXXL_VERBOSE1(write_order[i].first);
310 delete[] write_order;
313 __STXXL_END_NAMESPACE
315 #endif // !STXXL_ASYNC_SCHEDULE_HEADER