summaryrefslogtreecommitdiff
path: root/libfec
diff options
context:
space:
mode:
authorKelvin Zhang <zhangkelvin@google.com>2023-05-08 16:20:26 -0700
committerKelvin Zhang <zhangkelvin@google.com>2023-05-08 16:20:26 -0700
commitf2e4572135146fc7751a9fee1c1a0e441b826da4 (patch)
tree0c418071ebbb778d5f4442ed34d7eea6479a3173 /libfec
parent29d033732dad468ec7c6936571f42749bbdfbf35 (diff)
downloadextras-f2e4572135146fc7751a9fee1c1a0e441b826da4.tar.gz
Cleanup fec_process to use modern C++ primitives
Test: th Bug: 279705058 Change-Id: I644fcee0f206b2f9523f413d82650fe0ea8aba6d
Diffstat (limited to 'libfec')
-rw-r--r--libfec/fec_process.cpp40
1 files changed, 12 insertions, 28 deletions
diff --git a/libfec/fec_process.cpp b/libfec/fec_process.cpp
index 46f48bc7..51290772 100644
--- a/libfec/fec_process.cpp
+++ b/libfec/fec_process.cpp
@@ -14,12 +14,13 @@
* limitations under the License.
*/
+#include <future>
#include "fec_private.h"
struct process_info {
int id;
- fec_handle *f;
- uint8_t *buf;
+ fec_handle* f;
+ uint8_t* buf;
size_t count;
uint64_t offset;
read_func func;
@@ -28,21 +29,15 @@ struct process_info {
};
/* thread function */
-static void * __process(void *cookie)
-{
- process_info *p = static_cast<process_info *>(cookie);
-
- debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset,
- p->offset + p->count);
+static process_info* __process(process_info* p) {
+ debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset, p->offset + p->count);
p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors);
return p;
}
/* launches a maximum number of threads to process a read */
-ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
- read_func func)
-{
+ssize_t process(fec_handle* f, uint8_t* buf, size_t count, uint64_t offset, read_func func) {
check(f);
check(buf);
check(func);
@@ -75,7 +70,7 @@ ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
debug("max %d threads, %zu bytes per thread (total %zu spanning %zu blocks)", threads,
count_per_thread, count, blocks);
- std::vector<pthread_t> handles;
+ std::vector<std::future<process_info*>> handles;
process_info info[threads];
ssize_t rc = 0;
@@ -94,30 +89,19 @@ ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
info[i].count = left;
}
- pthread_t thread;
-
- if (pthread_create(&thread, NULL, __process, &info[i]) != 0) {
- error("failed to create thread: %s", strerror(errno));
- rc = -1;
- } else {
- handles.push_back(thread);
- }
+ handles.push_back(std::async(std::launch::async, __process, &info[i]));
pos = end;
- end += count_per_thread;
+ end += count_per_thread;
left -= info[i].count;
}
ssize_t nread = 0;
/* wait for all threads to complete */
- for (auto thread : handles) {
- process_info *p = NULL;
-
- if (pthread_join(thread, (void **)&p) != 0) {
- error("failed to join thread: %s", strerror(errno));
- rc = -1;
- } else if (!p || p->rc == -1) {
+ for (auto&& future : handles) {
+ process_info* p = future.get();
+ if (!p || p->rc == -1) {
rc = -1;
} else {
nread += p->rc;