Developer.

[멋사 백엔드 19기] TIL 30일차 멀티스레딩 응용 클래스

📂 목차


📚 본문

쓰레드 심화

실무에 근거하여 더 잘 활용할 수 있게 다양한 메서드들을 본다.

쓰레드와 자원

쓰레드는 처리 흐름, 자원은 데이터로 확실히 구분시켜야 한다. 그런 관점에서 볼때, repository, service 등등은 전부 흐름이고(Inmemory 는 예외), db, 임시 저장 공간(세션, 구성 정보) 등등은 전부 자원이 될 것이다.

synchronized 를 걸때는 트래픽이 많을 때 서버 성능이 느려지지 않을지를 고민해야 한다.

public Book addBook(String title,
                    String author) {
    var builder = new Book.Builder().setTitle(title)
                                    .setAuthor(author);
    Book book;

    // 동시성 이슈가 있는 부분의 코드만 lock 걸기
    synchronized (bookRepository) {
        book = builder.build(bookRepository.count() + 1);
        if (bookRepository.add(book) == null)
            throw new SystemException("책 저장 실패");
    }

    return book;
}

wait(), notify(), notifyAll()

wait, notify, notifyAll 은 모니터 락을 보유한 상태에서 호출 가능하며, critical region 에 들어가는 thread 만이 이 메서드들을 쓸 수 있다(Object 라고 다 쓸 수 있지만 코드 블럭 수준에 제약이 있음).

sleep(ms) 과의 차이는 lock 을 쥔 채로 대기하기에 얘는 critical region 에 없더라도 호출 가능하다.

Producer-Consumer Problems

Object 에는 wait, notify 가 있어서 모든 클래스들에 대해 해당 메서드를 수행함을 볼 수 있다.

여기서는 Producer-Consumer 문제로 이를 다뤄보자. 생산자는 데이터를 생성하여 버퍼에 공급을 한다. 소비자는 Buffer 에서 데이터를 꺼내어 사용을 한다. 버퍼는 생산자와 소비자가 데이터를 주고받는 공유 공간이며, 진열대라고 생각하면 된다.

다음과 같은 문제가 발생한다:

  • 버퍼가 가득 찼을 때: 생산자가 데이터를 넣으려면 대기해야 한다
  • 버퍼가 부족 할 때: 소비자가 데이터가 들어올 때까지 대기해야 한다
  • 동시 접근 문제: 여러 소비자가 동시에 데이터를 소비하려고 할 때 데이터가 깨지거나 누락될 수도 있다
구현

우선 상황 세팅을 해주자. 기본적으로 진열대가 필요할 수 있다.

static final Object lock = new Object();
static final int MAX_SIZE = 10;
static boolean[] buffer = new boolean[MAX_SIZE];
static int consumeIdx;

그 다음 생산자는 이 buffer 에 채우는 ‘행위’ 를 해야 한다.

// 생산
public static void produce() throws InterruptedException {
    synchronized (lock) {
        while(buffer[MAX_SIZE - 1])
            lock.wait();
        Arrays.fill(buffer, true);
        lock.notifyAll();
    }
    System.out.println("생산 완료 !!!, 대기 모드 on");
}

buffer 를 true 로 채운다.

// 소비
public static void consume() throws InterruptedException {
    synchronized (lock) {
        while (!buffer[MAX_SIZE - 1]) {
            System.out.println("소비 대기중");
            lock.wait();
        }
        buffer[consumeIdx] = false;
        consumeIdx = (consumeIdx + 1) % MAX_SIZE;
        lock.notify();
        System.out.println("소비함 현재 재고: " + Arrays.toString(buffer) + ", consumeIdx: " + consumeIdx);
    }
}

소비자는 위와 같이 소비하게 된다. 메인에서는 재고를 채우고 소비하면서 값이 바뀐다

Thread produce = new Thread(() -> {
    while(true) {
        produce();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
});

Thread consume = new Thread(() -> {
    while(true) {
        consume();
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
});

produce.start();
consume.start();

위와 같은 상황은 비정상적인 상황이 잘 만들어지지 않지만, 여러 소비자가 들어가게 되면 말이 다르다.

Thread produce = new Thread(() -> {
    while(true) {
        try {
            produce();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        try {
            Thread.currentThread().sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
});

Thread consume1 = new Thread(() -> {
    while(true) {
        try {
            consume();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        try {
            Thread.currentThread().sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
});

Thread consume2 = new Thread(() -> {
    while(true) {
        try {
            consume();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        try {
            Thread.currentThread().sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
});

Thread consume3 = new Thread(() -> {
    while(true) {
        try {
            consume();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        try {
            Thread.currentThread().sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
});

produce.start();
consume1.start();
consume2.start();
consume3.start();

consumer idx 가 적절하게 증가하지 않는 상황

소비함 현재 재고: [true, true, false, false, false, false, false, false, false, false], consumeIdx: 0
소비함 현재 재고: [false, false, false, false, false, false, false, false, false, false], consumeIdx: 2
소비함 현재 재고: [false, false, false, false, false, false, false, false, false, false], consumeIdx: 2

생산되기도 전에 소비

소비함 현재 재고: [false, false, true, true, true, true, false, false, false, false], consumeIdx: 2
소비함 현재 재고: [false, true, true, true, true, true, false, false, false, false], consumeIdx: 1
소비함 현재 재고: [false, false, false, true, true, true, false, false, false, false], consumeIdx: 3
6 만큼 채움 현재 재고: [false, true, true, true, true, true, false, false, false, false], prodIdx: 6

이를 정상적으로 만들기 위해 consumer 는 버퍼에 재고가 있을 때, 다시 말하면 없을 때는 대기 상태에 들어갔다가, producer 가 생산했을 때 일어나서 소비를 해야한다.

producerconsumer 가 더 이상 소비할 수 있는 재고가 없을 때 일어나야 한다. 그렇다면 쓰레드 간에 서로 깨운다, 대기해라 라는 소통을 해야하는 창구가 필요하다. 그 창구가 바로 자원 lock 이다.

static final Object lock = new Object();
...
public static void produce() throws InterruptedException {
    synchronized (lock) {
        while(buffer[MAX_SIZE - 1])
            lock.wait();
        Arrays.fill(buffer, true);
        lock.notifyAll();
    }
    System.out.println("생산 완료 !!!, 대기 모드 on");
}

public static void consume() throws InterruptedException {
    synchronized (lock) {
        while (!buffer[MAX_SIZE - 1]) {
            System.out.println("소비 대기중");
            lock.wait();
        }
        buffer[consumeIdx] = false;
        consumeIdx = (consumeIdx + 1) % MAX_SIZE;
        lock.notify();
        System.out.println("소비함 현재 재고: " + Arrays.toString(buffer) + ", consumeIdx: " + consumeIdx);
    }
}

이제 계속적으로 생산 소비가 반복될 것이다.

ExecutorService

스레드 풀을 생성하여 놀고 있는 스레드에 명령을 간편하게 내릴 수 있다. 스레드 풀의 사이즈는 스레드 개수와 똑같고, 생성은 Executors 의 정적 메서드로 한다.

// 고정 스레드 풀
ExecutorService executor = Executors.newFixedThreadPool(4);
// 단일 스레드 풀
ExecutorService executor = Executors.newSingleThreadExecutor();
// 캐시형 스레드 풀: 필요할 때 스레드 만들고 놀고 있는 스레드가 있으면 재사용
ExecutorService executor = Executors.newCachedThreadPool();

// 스케줄링 가능한 풀
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 5초 뒤에 실행
executor.schedule(() -> System.out.println("5초 뒤 실행"), 5, TimeUnit.SECONDS);
// 1초 뒤 시작, 이후 2초 간격으로 반복 실행
executor.scheduleAtFixedRate(() -> System.out.println("주기 실행"), 1, 2, TimeUnit.SECONDS);

// 병렬 처리 특화 풀
ExecutorService executor = ForkJoinPool.commonPool();

위처럼 안하고 밑처럼 세밀한 조정으로 풀을 생성할 수 있다.

Custom Thread Pool 생성

ExecutorService executor = new ThreadPoolExecutor(
        2,                // core pool size
        4,                // maximum pool size
        60L,              // idle thread keep-alive time
        TimeUnit.SECONDS, // 시간 단위
        new LinkedBlockingQueue<>(100) // 작업 큐
);
shutdown()

스레드 풀을 종료하는 메서드이다.

executor.shutdown();
  • 이미 제출된 작업은 모두 끝까지 실행됨
  • 새로운 작업 제출은 거부됨

shutdownNow() 는 강제로 모든 작업을 종료하며, 실행 중인 스레드는 인터럽트를 발생시켜 즉시 종료해버린다. 반환하는 건 List 인데 실행중인 스레드의 작업에 대한 것은 반환하지 않기 때문에 작업이 날라갈 수도 있다.

Callable & Future

함수형 인터페이스 Callable<V>V call() 을 구현해야 하며, 반환값이 있는 작업을 정의할 수 있다. 이를 통해 ExecutorService 에게 submit(Callable callable) 에 넣어 제출할 수 있다.

import java.util.concurrent.*;

public class SumExample {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        int[] numbers = new int[10];
        for (int i = 0; i < numbers.length; i++) numbers[i] = i + 1; // 1~10

        // 배열을 3부분으로 나눠서 합 계산
        Callable<Integer> sumPart1 = () -> {
            int sum = 0;
            for (int i = 0; i < 3; i++) sum += numbers[i];
            return sum;
        };
        Callable<Integer> sumPart2 = () -> {
            int sum = 0;
            for (int i = 3; i < 7; i++) sum += numbers[i];
            return sum;
        };
        Callable<Integer> sumPart3 = () -> {
            int sum = 0;
            for (int i = 7; i < 10; i++) sum += numbers[i];
            return sum;
        };

        Future<Integer> f1 = executor.submit(sumPart1);
        Future<Integer> f2 = executor.submit(sumPart2);
        Future<Integer> f3 = executor.submit(sumPart3);

        int totalSum = f1.get() + f2.get() + f3.get();
        System.out.println("총 합계: " + totalSum);

        executor.shutdown();
    }
}

제출 후에 얻으려면 Future<V> 이 필요한데, 비동기 작업에 대한 결과를 나타내는 객체이다. V get(), V get(long timeout, TimeUnit unit) 으로 값을 얻어올 수 있다.

여러 개 값 가져오기

ExecutorService executor = Executors.newFixedThreadPool(3);

List<Callable<String>> tasks = List.of(
    () -> { Thread.sleep(1000); return "A 완료"; },
    () -> { Thread.sleep(2000); return "B 완료"; },
    () -> { Thread.sleep(1500); return "C 완료"; }
);

List<Future<String>> futures = executor.invokeAll(tasks);

for (Future<String> f : futures) {
    System.out.println(f.get());
}

executor.shutdown();

ReentrantLock

자원에 대한 동시성 제어 메커니즘이다. 이전에는 흐름이 주된 제어 메커니즘이었다. ReentrantLock 은 이름 그대로 같은 스레드가 같은 락을 여러 번 획득 한다.

java.util.concurrent.locks.ReentrantLock

기본 사용

import java.util.concurrent.locks.ReentrantLock;

class SharedResource {
    private final ReentrantLock lock = new ReentrantLock();
    private int count = 0;

    public void increment() {
        lock.lock();
        try {
            count++;
            System.out.println(Thread.currentThread().getName() + " : " + count);
        } finally {
            lock.unlock();
        }
    }
}

시간 제한

if (lock.tryLock()) {
    try {
        // 락 얻었을 때만 실행
    } finally {
        lock.unlock();
    }
} else {
    System.out.println("락을 얻지 못해 다른 작업 수행");
}

// 일정 시간만 시도
if (lock.tryLock(2, TimeUnit.SECONDS)) {
    try {
        // 2초 안에 락 얻으면 실행
    } finally {
        lock.unlock();
    }
}
Condition
import java.util.concurrent.locks.*;

class BoundedBuffer {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull  = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    private final int[] buffer = new int[10];
    private int count, putPtr, takePtr;

    public void put(int x) throws InterruptedException {
        lock.lock();
        try {
            while (count == buffer.length) {
                notFull.await();  // 버퍼 꽉 차면 대기
            }
            buffer[putPtr] = x;
            putPtr = (putPtr + 1) % buffer.length;
            count++;
            notEmpty.signal();   // 소비자 깨우기
        } finally {
            lock.unlock();
        }
    }

    public int take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await(); // 버퍼 비었으면 대기
            }
            int x = buffer[takePtr];
            takePtr = (takePtr + 1) % buffer.length;
            count--;
            notFull.signal();    // 생산자 깨우기
            return x;
        } finally {
            lock.unlock();
        }
    }
}

wait/notify/notifyAllawait/signal/signalAll 로 대체하고 여러 개의 Condition 을 나누어 관리하고 싶을 때 사용할 수 있다.

CountDownLatch

lock 을 여러개 놓을 수 있다고 보면 된다. 0이 될때까지 대기할 수 있는 메커니즘이 있으며, 초기화된 값을 다 count 했을 때는 사용이 더 이상 불가하다.

import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int nThreads = 3;
        CountDownLatch latch = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            final int id = i;
            new Thread(() -> {
                System.out.println("스레드 " + id + " 준비 완료");
                latch.countDown(); // 준비 완료
            }).start();
        }

        System.out.println("모든 스레드 준비될 때까지 대기...");
        latch.await(); // 카운트가 0이 될 때까지 대기
        System.out.println("모든 스레드 준비 완료! 작업 시작!");
    }
}

BlockingQueue

public class BlockingQueueExample {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

        // 생산자
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    String item = "Item-" + i;
                    queue.put(item); // 큐가 가득 차면 대기
                    System.out.println("생산: " + item);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        // 소비자
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    String item = queue.take(); // 큐가 비면 대기
                    System.out.println("소비: " + item);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        producer.start();
        consumer.start();
    }
}

CyclicBarrier

import java.util.concurrent.*;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int nThreads = 3;

        CyclicBarrier barrier = new CyclicBarrier(nThreads, () -> {
            System.out.println("모든 스레드가 도착했습니다. 다음 단계로 진행!");
        });

        for (int i = 0; i < nThreads; i++) {
            int id = i;
            new Thread(() -> {
                try {
                    System.out.println("스레드 " + id + " 작업 중...");
                    Thread.sleep((id + 1) * 1000);
                    System.out.println("스레드 " + id + " barrier 도착");
                    barrier.await(); // 모든 스레드가 여기서 대기
                    System.out.println("스레드 " + id + " 다음 단계 실행");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

람다

저번에 다뤘기에 이번에는 이론적으로 조금 빠삭하게 살펴보고 가자.

First-Class Citizen

조건

  • 변수에 담을 수 있어야 함
  • 함수의 인자로 전달할 수 있어야 함
  • 함수의 반환값으로 전달할 수 있어야 함

자바는 함수를 First-Class Citizen 으로써 사용 가능하지만, FP 언어는 아니다.

Functional Programming Language

자바는 함수형 프로그래밍이 아니다. 함수형 프로그래밍이 되려면 다음을 만족시켜야 한다.

  1. 순수 함수(Pure Function)
    • 동일 입력 -> 동일 출력
    • 함수 내부에서 외부 상태를 변경할 수 없음(즉, side effect 없음)
  2. 불변성(Immutability)
    • 데이터는 한 번 생성하면 변경하지 않음
    • 상태 변경 대신 새로운 데이터 생성
  3. 고차 함수(Higher-order Function)
    • 함수를 인자로 받거나 함수를 반환 가능
  4. 함수 조합(Function Composition)
    • 작은 함수를 조합해 더 큰 함수를 만듦
  5. 선언형 프로그래밍(Declarative)
    • 무엇을 할 지 중심, 어떻게 할 지는 최소화

자바에서는 순수 함수는 아니며, 객체 상태를 변경할 수 있기에 함수형 프로그래밍은 아니다. 다만 FP 를 흉내내고 있는 것이다.

익명 구현 객체

자바는 함수만 순수하게 존재할 수 없었기에 FP 흉내를 내려고 익명 구현 객체를 생성한다 .

// Runnable을 구현하는 이름 없는 객체
Runnable r = new Runnable() {
    public void run() {
        System.out.println("익명 구현 객체가 출력합니다.");
    }
};
r.run();

@FunctionalInterface 가 무엇인지, Runnable 이 무엇인지는 이미 전에 다뤘으므로 넘어간다. 또한 수많은 미리 정의된 FunctionalInterface 들은 다음과 같다.

FunctionalInterfaces

  • Runnable: 입력 X, 반환 X
  • Callable: 입력 X, 반환 O, 비동기

  • Supplier: 입력 X, 반환 O
    • IntSupplier
    • LongSupplier
    • DoubleSupplier
  • Consumer: 입력 X, 반환 X
    • IntConsumer
    • LongConsumer
    • DoubleConsumer
    • ObjIntConsumer: 제너릭 객체 + int 값을 입력
    • ObjLongConsumer
    • ObjDoubleConsumer
  • Predicate
    • IntPredicate
    • LongPredicate
    • DoublePredicate
  • Function<T, R>
    • IntFunction
    • LongFunction
    • DoubleFunction
    • ToIntFunction
    • ToLongFunction
    • ToDoubleFunction
  • UnaryOperator
    • IntUnaryOperator
    • LongUnaryOperator
    • DoubleUnaryOperator

  • BiConsumer
  • BiPredicate
  • BiFunction
  • BinaryOperator
    • IntBinaryOperator
    • LongBinaryOperator
    • DoubleBinaryOperator

굳이 설명은 따로 하지 않으며, 그때그때 찾아서 쓰는 것으로 한다.