Developer.

[멋사 백엔드 19기] TIL 11일차

🪛 한계점

실무에서 이론을 적용시키기에는 몸이 단련이 되어 익숙해있어야 한다.
미니 프로젝트를 통해서 체화 되도록 연습해야한다.

📂 목차


📚 본문

미니프로젝트를 하면서 필요했던 것들을 구현해보자. 여기서는 순수 자바에 대한 구현에 대해 말한다.

Context

어플리케이션의 상태, 환경 정보, 공통 리소스 등을 중앙에서 관리하는 역할을 한다.

  • RequestContext: 요청별 정보(사용자, 세션, 트랜잭션) 등을 저장한다.
  • ThreadPool + Context: 스레드 풀에서 작업을 실행할 때,
    작업이 필요한 환경 정보나 리소스를 컨텍스트에서 가져오게 된다.

두 번째 형태는 보통 순수 자바 서버에서 특정 시스템을 구현할 때 사용한다고 한다.

ThreadLocal

우선 작업을 처리할 스레드 여러 개를 다룰 수 있게 스레드 한 단위를 구현해보자.

순수 자바에서는 ThreadLocal 이라는 클래스를 제공하는데, 해당 인스턴스는 독립적으로
변수 저장소를 가지고, 일반적인 인스턴스 변수나 static 변수와는 달리 thread 마다 별도의 값을 지닌다.

따라서 여러 스레드가 같은 ThreadLocal 객체를 공유해도, 실제 값은 스레드 별로 독립적이다.

// ThreadLocal 선언
ThreadLocal<String> threadLocal = new ThreadLocal<>();

// 값 저장
threadLocal.set("Alice");

// 값 읽기
String user = threadLocal.get();
System.out.println(user); // Alice

// 값 제거 (필수는 아니지만 메모리 누수 방지를 위해 권장)
threadLocal.remove();

요약

  • 각 thread 에서 get() 을 호출 시 자기 스레드 전용 값이 나옴
  • 다른 스레드에서 set() 을 해도 다른 스레드 전용 값에는 영향을 안미침

이를 토대로 RequestContext 에 해당 인스턴스를 적용시키면,

public class RequestContext {
    private static final ThreadLocal<RequestContext> context =
        ThreadLocal.withInitial(RequestContext::new);

    private String currentUser;

    public static RequestContext get() {
        return context.get();
    }

    public String getCurrentUser() {
        return currentUser;
    }

    public void setCurrentUser(String user) {
        this.currentUser = user;
    }

    public static void clear() {
        context.remove();
    }
}

위와 같이 선언하면 스레드 별로 다른 값들을 관리할 수 있게 된다.

ExecutorService

이제 로컬 스레드의 저장소 관리를 보았으니, 처리하는 프로세스를 정의 할 차례이다.

순수 자바에서 ExecutorService 는 스레드 풀(Thread Pool)을 관리하고,
스레드 작업을 쉽게 실행할 수 있게 해주는 핵심 클래스이다.
여기에 작업을 제출하여 실행할 수 있게 된다.

예시

import java.util.concurrent.*;

public class ExecutorServiceExample {
    public static void main(String[] args) {

        // 1. 고정 스레드 풀 생성 (스레드 3개)
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 2. 작업 제출
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 처리중: Task-" + taskId);
            });
        }

        // 3. 종료
        executor.shutdown();
    }
}

위처럼 작업(Runnable, Callable)을 받아들여 작업 큐를 통해 3개의 스레드로 들어가 처리한다. 여기서 submit 은 비동기로 실행된다.

ExecutorServicesubmit() 된 task 들은 전부 Future 를 통해 값을 받을 수 있다.

Modern Java Concurrent 버전

하지만 위와 같이 여러 개의 작업을 실행 후에 Future 로 받는다고 치면 Future 이 여러개 있어야 한다. 이를 한 변수로 담아서 한 번에 처리하려면 다음 코드를 보자:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;

public class CompletableFutureExample {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        System.out.println("작업 시작...");

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "에서 비동기 작업 실행 중...");
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 3000));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "작업 완료!";
        }, executor); // 사용 할 Executor를 명시적으로 지정

        future.thenAccept(result -> {
            System.out.println(Thread.currentThread().getName() + "에서 결과값 받음: " + result);
        });

        System.out.println("메인 스레드는 다른 작업 진행 중...");


        try {
            future.get(); // 작업이 끝날 때까지 기다림
        } catch (Exception e) {
            e.printStackTrace();
        }
        executor.shutdown();
        System.out.println("모든 작업 종료.");
    }
}

위 예제에서는 CompletableFuture 라는 클래스의

  • thenAccept 를 통해 값이 들어올 때마다 consume 되도록 하는 함수를 볼 수 있고,
  • supplyAsync 함수를 통해 값을 Supplier 를 통해 ExecutorService 에게 제공함을 볼 수 있다.
  • 만약 스레드 간의 각 task 를 처리 중에 있어 예외가 발생하면 자동적으로 스레드의 interrupt 값은 true -> false 로 초기화가 된다. 이때 상위 호출자에게 하위 호출자가 interrupt 되었다는 것을 알리기 위해 interrupt() 를 호출하여 여기에 Exception 이 터졌다는 것을 알려주게 된다.
allOf(), anyOf()
  • CompletableFuture.allOf() 인자로 받은 CompletableFuture 들에 대해 모든 결과가 성공적으로 완료됐을 때, CompletableFuture<Void> 를 생성하는 함수
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "결과1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "결과2");

// 모든 작업이 끝날 때까지 기다림
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

// 모든 작업이 완료된 후에 실행
allFutures.thenRun(() -> {
    System.out.println("모든 작업이 완료되었습니다.");
});
  • CompletableFuture.anyOf() 인자로 받은 CompletableFuture 중에 하나라도 완료되면 해당 완료된 값을 담는 새로운 CompletableFuture<Object> 를 생성한다.
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(3000); } catch (Exception e) {}
    return "서버 A의 결과";
});
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (Exception e) {}
    return "서버 B의 결과"; // 더 빨리 끝남
});

// 둘 중 하나라도 먼저 끝나는 것을 기다림
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futureA, futureB);

// 가장 먼저 완료된 작업의 결과값을 사용
anyFuture.thenAccept(result -> {
    System.out.println("가장 먼저 완료된 결과: " + result);
});

이때 명시적 형변환이 불가피하다.

exceptionally()

CompletableFuture 에서 비동기 작업 중 예외가 발생했을 때 그 예외를 잡아서 처리하고, 정상적인 결과값으로 복구할 수 있게 해주는 메서드이다.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;

public class ExceptionallyExample {
    public static void main(String[] args) {
        // 1. 의도적으로 예외를 발생시키는 CompletableFuture
        CompletableFuture<String> futureWithException = CompletableFuture.supplyAsync(() -> {
            System.out.println("작업 시작: 예외를 발생시킵니다.");
            throw new RuntimeException("고의로 발생시킨 에러!");
        }, Executors.newSingleThreadExecutor());

        // 2. exceptionally()를 사용하여 예외 처리
        CompletableFuture<String> recoveredFuture = futureWithException.exceptionally(ex -> {
            System.out.println("예외를 잡았습니다: " + ex.getMessage());
            // 예외가 발생했을 때 대체할 값(정상 값)을 반환
            return "복구된 기본값"; 
        });

        // 3. thenAccept()로 최종 결과를 받아서 출력
        recoveredFuture.thenAccept(result -> {
            System.out.println("최종 결과: " + result);
        });
        
        // main 스레드가 종료되지 않도록 잠시 대기
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}