다중 스레딩과 동시성(C++11)

2025. 3. 28. 10:04Programming Languages/C++

9.4 다중 스레딩과 동시성(C++11)

C++11부터는 스레드와 동시성을 위한 표준 라이브러리가 추가되었습니다.

9.4.1 스레드 기본 사용

#include <iostream>
#include <thread>
#include <string>
#include <vector>

// 간단한 스레드 함수
void hello() {
    std::cout << "Hello from thread " << std::this_thread::get_id() << std::endl;
}

// 매개변수를 받는 스레드 함수
void printMessage(const std::string& message, int count) {
    for (int i = 0; i < count; i++) {
        std::cout << "Thread " << std::this_thread::get_id() 
                  << ": " << message << " " << i << std::endl;
        
        // 스레드 잠시 대기
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

// 값 반환 함수 (참조로 결과 반환)
void calculateSum(const std::vector<int>& numbers, int start, int end, int& result) {
    result = 0;
    for (int i = start; i < end && i < numbers.size(); i++) {
        result += numbers[i];
    }
}

int main() {
    // 기본 스레드 생성 및 실행
    std::thread t1(hello);
    t1.join();  // 스레드가 완료될 때까지 대기
    
    // 매개변수가 있는 스레드
    std::thread t2(printMessage, "Message", 5);
    
    // 람다 표현식으로 스레드 생성
    std::thread t3([]() {
        for (int i = 0; i < 3; i++) {
            std::cout << "Lambda thread: " << i << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(200));
        }
    });
    
    // 메인 스레드에서 작업 수행
    for (int i = 0; i < 3; i++) {
        std::cout << "Main thread: " << i << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
    }
    
    // 스레드 조인 (완료 대기)
    t2.join();
    t3.join();
    
    // 스레드 분리 (백그라운드로 실행)
    std::thread t4(printMessage, "Background", 3);
    t4.detach();
    
    // 분리된 스레드가 조인 가능한지 확인
    std::cout << "t4 조인 가능? " << std::boolalpha << t4.joinable() << std::endl;
    
    // 하드웨어 동시성 정보
    std::cout << "하드웨어 동시성: " << std::thread::hardware_concurrency() << std::endl;
    
    // 결과를 참조로 반환하는 스레드
    std::vector<int> numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    int sum1 = 0, sum2 = 0;
    
    std::thread t5(calculateSum, std::ref(numbers), 0, 5, std::ref(sum1));
    std::thread t6(calculateSum, std::ref(numbers), 5, 10, std::ref(sum2));
    
    t5.join();
    t6.join();
    
    std::cout << "부분합1: " << sum1 << ", 부분합2: " << sum2 << std::endl;
    std::cout << "전체 합: " << sum1 + sum2 << std::endl;
    
    // 잠시 대기 (분리된 스레드를 위해)
    std::this_thread::sleep_for(std::chrono::seconds(1));
    
    return 0;
}

9.4.2 뮤텍스와 락(Mutex & Lock)

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <chrono>

class Counter {
private:
    int value;
    std::mutex mutex;  // 뮤텍스 선언
    
public:
    Counter() : value(0) {}
    
    // 뮤텍스를 사용한 증가 함수
    void increment() {
        std::lock_guard<std::mutex> lock(mutex);  // RAII 스타일의 락
        
        // 임계 영역 (Critical Section)
        ++value;
    }
    
    // 다른 방식의 뮤텍스 사용
    void incrementWithUniqueLock() {
        std::unique_lock<std::mutex> lock(mutex);  // 더 유연한 락
        
        // 락을 일시적으로 해제
        lock.unlock();
        // 중요하지 않은 작업 수행...
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
        
        // 락 다시 획득
        lock.lock();
        ++value;
        // lock 소멸 시 자동으로 뮤텍스 해제
    }
    
    // 락 없이 값 읽기 (스레드 안전하지 않음)
    int getValue() const {
        return value;
    }
    
    // 락을 사용한 값 읽기 (스레드 안전)
    int getValueSafe() {
        std::lock_guard<std::mutex> lock(mutex);
        return value;
    }
};

void workerFunction(Counter& counter, int iterations) {
    for (int i = 0; i < iterations; i++) {
        counter.increment();
    }
}

int main() {
    Counter counter;
    const int numThreads = 10;
    const int iterationsPerThread = 1000;
    
    // 여러 스레드 생성
    std::vector<std::thread> threads;
    
    for (int i = 0; i < numThreads; i++) {
        threads.push_back(std::thread(workerFunction, std::ref(counter), iterationsPerThread));
    }
    
    // 모든 스레드 완료 대기
    for (auto& t : threads) {
        t.join();
    }
    
    // 결과 확인
    std::cout << "Counter 값: " << counter.getValue() << std::endl;
    std::cout << "예상 값: " << numThreads * iterationsPerThread << std::endl;
    
    // 데드락 방지를 위한 std::lock 사용 예제
    std::mutex mutex1, mutex2;
    
    std::thread t1([&]() {
        // 두 뮤텍스를 안전하게 잠금
        std::lock(mutex1, mutex2);
        
        // 잠금 소유권 가져오기
        std::lock_guard<std::mutex> lock1(mutex1, std::adopt_lock);
        std::lock_guard<std::mutex> lock2(mutex2, std::adopt_lock);
        
        std::cout << "Thread 1: 두 뮤텍스 모두 잠금" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    });
    
    std::thread t2([&]() {
        // 반대 순서로 시도해도 데드락 발생하지 않음
        std::lock(mutex2, mutex1);
        
        std::lock_guard<std::mutex> lock2(mutex2, std::adopt_lock);
        std::lock_guard<std::mutex> lock1(mutex1, std::adopt_lock);
        
        std::cout << "Thread 2: 두 뮤텍스 모두 잠금" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    });
    
    t1.join();
    t2.join();
    
    // C++17의 scoped_lock 사용 (더 간단)
    std::thread t3([&]() {
        std::scoped_lock lock(mutex1, mutex2);  // 여러 뮤텍스를 한 번에 잠금
        std::cout << "Thread 3: 두 뮤텍스 모두 잠금 (scoped_lock 사용)" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    });
    
    t3.join();
    
    return 0;
}

9.4.3 조건 변수(Condition Variables)

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>

// 간단한 스레드 안전 큐
template<typename T>
class ThreadSafeQueue {
private:
    std::queue<T> queue;
    mutable std::mutex mutex;
    std::condition_variable cond;
    
public:
    // 큐에 요소 추가
    void push(T item) {
        {
            std::lock_guard<std::mutex> lock(mutex);
            queue.push(item);
            std::cout << "항목 추가: " << item << std::endl;
        }
        cond.notify_one();  // 대기 중인 스레드 하나 깨우기
    }
    
    // 큐에서 요소 가져오기 (조건 변수 사용)
    T pop() {
        std::unique_lock<std::mutex> lock(mutex);
        
        // 큐가 비어있지 않을 때까지 대기
        cond.wait(lock, [this]{ return !queue.empty(); });
        
        T item = queue.front();
        queue.pop();
        
        std::cout << "항목 제거: " << item << std::endl;
        return item;
    }
    
    // 큐가 비어있는지 확인
    bool empty() const {
        std::lock_guard<std::mutex> lock(mutex);
        return queue.empty();
    }
    
    // 큐의 크기 반환
    size_t size() const {
        std::lock_guard<std::mutex> lock(mutex);
        return queue.size();
    }
};

// 생산자 함수
void producer(ThreadSafeQueue<int>& queue, int start, int count) {
    for (int i = 0; i < count; i++) {
        int item = start + i;
        queue.push(item);
        
        // 생산 간격 조절
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

// 소비자 함수
void consumer(ThreadSafeQueue<int>& queue, int count) {
    for (int i = 0; i < count; i++) {
        int item = queue.pop();  // 큐가 비어있으면 대기
        
        // 소비 간격 조절
        std::this_thread::sleep_for(std::chrono::milliseconds(150));
    }
}

int main() {
    ThreadSafeQueue<int> queue;
    
    // 두 개의 생산자와 한 개의 소비자 생성
    std::thread producerThread1(producer, std::ref(queue), 100, 5);
    std::thread producerThread2(producer, std::ref(queue), 200, 5);
    std::thread consumerThread(consumer, std::ref(queue), 10);
    
    // 스레드 종료 대기
    producerThread1.join();
    producerThread2.join();
    consumerThread.join();
    
    // 조건 변수의 다른 사용 예: 데이터 준비 대기
    std::mutex dataMutex;
    std::condition_variable dataReady;
    bool dataProcessed = false;
    int sharedData = 0;
    
    // 데이터 처리 스레드
    std::thread processorThread([&]() {
        std::cout << "처리 스레드: 데이터 대기 중..." << std::endl;
        
        std::unique_lock<std::mutex> lock(dataMutex);
        
        // predicate가 false인 동안 대기
        dataReady.wait(lock, [&]{ return dataProcessed; });
        
        std::cout << "처리 스레드: 데이터 처리 완료: " << sharedData << std::endl;
    });
    
    // 잠시 대기 후 데이터 설정
    std::this_thread::sleep_for(std::chrono::seconds(2));
    
    {
        std::lock_guard<std::mutex> lock(dataMutex);
        sharedData = 42;
        dataProcessed = true;
        std::cout << "메인 스레드: 데이터 준비 완료" << std::endl;
    }
    
    dataReady.notify_one();
    processorThread.join();
    
    return 0;
}

9.4.4 future와 promise

#include <iostream>
#include <thread>
#include <future>
#include <chrono>
#include <vector>
#include <numeric>
#include <random>

// future를 반환하는 함수 (비동기 작업)
std::future<int> calculateSumAsync(const std::vector<int>& numbers) {
    // packaged_task 또는 promise 대신 간단히 async 사용
    return std::async(std::launch::async, [numbers]() {
        std::cout << "비동기 합계 계산 시작 (스레드 ID: " 
                  << std::this_thread::get_id() << ")" << std::endl;
        
        // 시간이 오래 걸리는 작업 시뮬레이션
        std::this_thread::sleep_for(std::chrono::seconds(2));
        
        return std::accumulate(numbers.begin(), numbers.end(), 0);
    });
}

// promise와 future 사용 예제
void calculateProduct(std::promise<int>&& promise, const std::vector<int>& numbers) {
    try {
        std::cout << "곱셈 계산 시작 (스레드 ID: " 
                  << std::this_thread::get_id() << ")" << std::endl;
        
        // 시간이 오래 걸리는 작업 시뮬레이션
        std::this_thread::sleep_for(std::chrono::seconds(1));
        
        int result = 1;
        for (int num : numbers) {
            result *= num;
            
            // 에러 상황 시뮬레이션
            if (result > 10000) {
                throw std::overflow_error("결과가 너무 큽니다");
            }
        }
        
        // 결과 설정
        promise.set_value(result);
    }
    catch (...) {
        // 예외 전달
        promise.set_exception(std::current_exception());
    }
}

// packaged_task 사용 예제
int findMax(const std::vector<int>& numbers) {
    std::cout << "최댓값 검색 시작 (스레드 ID: " 
              << std::this_thread::get_id() << ")" << std::endl;
    
    // 시간이 오래 걸리는 작업 시뮬레이션
    std::this_thread::sleep_for(std::chrono::milliseconds(1500));
    
    return *std::max_element(numbers.begin(), numbers.end());
}

int main() {
    std::vector<int> numbers = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    
    std::cout << "메인 스레드 ID: " << std::this_thread::get_id() << std::endl;
    
    // std::async 사용 (가장 간단한 방법)
    std::future<int> sumFuture = calculateSumAsync(numbers);
    
    // future가 준비될 때까지 대기하지 않고 다른 작업 수행
    std::cout << "합계가 계산되는 동안 다른 작업 수행..." << std::endl;
    
    // promise와 future 사용
    std::promise<int> productPromise;
    std::future<int> productFuture = productPromise.get_future();
    
    std::thread productThread(calculateProduct, 
                             std::move(productPromise), 
                             std::vector<int>{1, 2, 3, 4, 5});
    
    // packaged_task와 future 사용
    std::packaged_task<int(const std::vector<int>&)> maxTask(findMax);
    std::future<int> maxFuture = maxTask.get_future();
    
    std::thread maxThread(std::move(maxTask), std::ref(numbers));
    
    // 결과 대기 및 사용
    try {
        // 합계 결과 가져오기 (완료될 때까지 대기)
        int sum = sumFuture.get();
        std::cout << "합계 결과: " << sum << std::endl;
        
        // 시간 제한으로 곱셈 결과 대기
        auto status = productFuture.wait_for(std::chrono::seconds(2));
        
        if (status == std::future_status::ready) {
            int product = productFuture.get();
            std::cout << "곱셈 결과: " << product << std::endl;
        }
        else if (status == std::future_status::timeout) {
            std::cout << "곱셈 결과 대기 시간 초과" << std::endl;
        }
        
        // 최댓값 결과 가져오기
        int maxValue = maxFuture.get();
        std::cout << "최댓값: " << maxValue << std::endl;
    }
    catch (const std::exception& e) {
        std::cerr << "예외 발생: " << e.what() << std::endl;
    }
    
    // 스레드 종료 대기
    if (productThread.joinable()) {
        productThread.join();
    }
    
    if (maxThread.joinable()) {
        maxThread.join();
    }
    
    // shared_future 사용 예제 (여러 스레드에서 동일한 결과 사용)
    std::promise<int> sharedPromise;
    std::shared_future<int> sharedFuture = sharedPromise.get_future().share();
    
    auto waitAndPrint = [](std::shared_future<int> future, int id) {
        std::cout << "스레드 " << id << " 결과 대기 중..." << std::endl;
        try {
            int value = future.get();  // 여러 스레드에서 안전하게 호출 가능
            std::cout << "스레드 " << id << " 결과 수신: " << value << std::endl;
        }
        catch (const std::exception& e) {
            std::cerr << "스레드 " << id << " 예외 수신: " << e.what() << std::endl;
        }
    };
    
    std::vector<std::thread> threads;
    for (int i = 0; i < 3; i++) {
        threads.push_back(std::thread(waitAndPrint, sharedFuture, i));
    }
    
    // 모든 스레드에 값 전달
    std::this_thread::sleep_for(std::chrono::seconds(1));
    sharedPromise.set_value(99);
    
    // 모든 스레드 종료 대기
    for (auto& t : threads) {
        t.join();
    }
    
    return 0;
}

'Programming Languages > C++' 카테고리의 다른 글

람다 표현식과 함수형 프로그래밍  (0) 2025.03.28
스마트 포인터(Smart Pointers) (C++11)  (0) 2025.03.28
파일 입출력  (0) 2025.03.28
고급 주제  (0) 2025.03.28
챕터8. 실습 문제  (0) 2025.03.28