summaryrefslogtreecommitdiff
path: root/libfec/fec_process.cpp
blob: 512907729e187d4732095af42bc727316a4a6a84 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
 * Copyright (C) 2015 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <future>
#include "fec_private.h"

struct process_info {
    int id;
    fec_handle* f;
    uint8_t* buf;
    size_t count;
    uint64_t offset;
    read_func func;
    ssize_t rc;
    size_t errors;
};

/* thread function  */
static process_info* __process(process_info* p) {
    debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset, p->offset + p->count);

    p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors);
    return p;
}

/* launches a maximum number of threads to process a read */
ssize_t process(fec_handle* f, uint8_t* buf, size_t count, uint64_t offset, read_func func) {
    check(f);
    check(buf);
    check(func);

    if (count == 0) {
        return 0;
    }

    int threads = sysconf(_SC_NPROCESSORS_ONLN);

    if (threads < WORK_MIN_THREADS) {
        threads = WORK_MIN_THREADS;
    } else if (threads > WORK_MAX_THREADS) {
        threads = WORK_MAX_THREADS;
    }

    uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE;
    size_t blocks = fec_div_round_up(offset + count - start, FEC_BLOCKSIZE);

    /* start at most one thread per block we're accessing */
    if ((size_t)threads > blocks) {
        threads = (int)blocks;
    }

    size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE;
    size_t left = count;
    uint64_t pos = offset;
    uint64_t end = start + count_per_thread;

    debug("max %d threads, %zu bytes per thread (total %zu spanning %zu blocks)", threads,
          count_per_thread, count, blocks);

    std::vector<std::future<process_info*>> handles;
    process_info info[threads];
    ssize_t rc = 0;

    /* start threads to process queue */
    for (int i = 0; i < threads && left > 0; ++i) {
        info[i].id = i;
        info[i].f = f;
        info[i].buf = &buf[pos - offset];
        info[i].count = (size_t)(end - pos);
        info[i].offset = pos;
        info[i].func = func;
        info[i].rc = -1;
        info[i].errors = 0;

        if (info[i].count > left) {
            info[i].count = left;
        }

        handles.push_back(std::async(std::launch::async, __process, &info[i]));

        pos = end;
        end += count_per_thread;
        left -= info[i].count;
    }

    ssize_t nread = 0;

    /* wait for all threads to complete */
    for (auto&& future : handles) {
        process_info* p = future.get();
        if (!p || p->rc == -1) {
            rc = -1;
        } else {
            nread += p->rc;
            f->errors += p->errors;
        }
    }

    if (left > 0 || rc == -1) {
        errno = EIO;
        return -1;
    }

    return nread;
}