작업이나 스레드를 안전하고 빠르고 안정적으로 멈추게 하는 일은 어려운 일이다..
자바에서는 스레드를 멈추게 하는 방법이 없다. 대신 interrupt라는 방법을 사용할 수 있다.
작업 중단 시나리오
1. 사용자가 취소하기를 요청한 경우 : 사용자가 중간에 취소 버튼을 눌러 취소를 요청한 경우
2. 시간이 제한된 작업 : 일정한 시간 내 답이 될만한 결과를 계속 찾다 제한된 시간이 지나면 나온 결과중 가장 최적의 해에 가까운 값을 반환 하고 나머지 는 작업을 취소하는 경우
3. 애플리케이션 이벤트 : 원하는 결과를 얻기 위해 다양한 조건을 지정해 여러 작업을 동시에 실행시킨다. 특정 작업 결과를 얻었다면 나머지는 작업은 모두 취소하는 경우
4. 오류 : 예상치 못한 오류로 진행 중이던 작업을 취소하고 현재 진행중이던 작업이 무엇인지 기록하는 작업을 하는 경우
5. 종료 : 애플리케이션 종료 (새롭게 배포하기 위해 기존 애플리케이션을 종료 후 다시 실행) 작업 큐에 대기하던 작업을 마무리 하는 절차가 필요한 경우 (우아한 종료 라고 하기도 한다.)
생성자 - 소비자 패턴과 데드락 발생 가능성
생성자 - 소비자 패턴을 예로 생각해보자
만약 생성자가 소비자 보다 더 빠른 속도로 작업을 블락킹 큐에 놓는다고 한다면 큐는 곧 가득 찰 것이고 이런 상테에서 부하가 걸린 소비자가 대기중인 생성자의 작업을 취소 시킨다면 컨슈머는 더 이상 작업을 처리하지 못하기 때문에 데드락 상황이 벌어질 수 도 있다.
아래 코드를 확인해보자
import java.util.concurrent.*;
public class ProducerConsumerDeadlock {
private static final int QUEUE_CAPACITY = 5;
private static final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
private static volatile boolean running = true; // 플래그 변수
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
// 생산자 (Producer): 빠르게 데이터 추가
Runnable producer = () -> {
int i = 0;
try {
while (running) {
queue.put(i);
System.out.println("생성자: " + i + " 추가");
i++;
Thread.sleep(100); // 속도를 조절 (너무 빠르면 큐가 금방 찬다)
}
} catch (InterruptedException e) {
System.out.println("생성자 종료");
}
};
// 소비자 (Consumer): 작업 취소 (잘못된 방식으로)
Runnable consumer = () -> {
try {
while (running) {
Integer item = queue.take();
System.out.println("소비자: " + item + " 처리");
Thread.sleep(500); // 소비 속도가 느림 -> 큐가 점점 가득 참
if (item == 5) { // 특정 조건에서 소비자가 생성자 취소
System.out.println("소비자가 생성자 중단 요청");
running = false; // 잘못된 종료 처리
}
}
} catch (InterruptedException e) {
System.out.println("소비자 종료");
}
};
executor.execute(producer);
executor.execute(consumer);
Thread.sleep(5000); // 5초 동안 실행
executor.shutdownNow(); // 강제 종료
}
}
위 코드의 문제점은 다음과 같다
1. 생성자가 소비자 보다 빠르게 큐를 채움 -> queue.put()이 블록킹 됨
2. 소비자가 특정 조건에서 running 플래그를 false 로 변경 -> 생성자가 계속 대기 상태
3. 큐가 꽉 찼는데 소비자가 중단된다면 -> queue.put()은 영원히 블록킹 -> 데드락 발생
데드락 해결 -> 인터럽트
특정 스레드의 interrupt 메서드를 호출한다 해도 해당 스레드가 처리하던 작업을 멈추지 않는다, 단지 해당 스레드에게 인터럽트 요청이 있었다는 메시지를 전달할 뿐이다.
인터럽트를 이해하고자 할때 가장 중요한건 바로 실행중인 스레드에 실제적인 제한을 가해 멈추지 않게 하는 것이다.
단지 해당하는 스레드가 상황을 봐서 스스로 멈춰주기를 요청하는 것뿐이다.
스레드가 멈추기 좋은 상황을 취소 포인트 라고 하는데
1. wait
2. sleep
3. join
같은 메서드는 실제로 인터럽트 요청을 받거나 실행할 때 인터럽트 상태라고 지정했던 순간에 예외를 발생시킨다.
만약 작업 취소의 기능을 구현하는 것이라면 인터럽트가 가장 적절한 방법이라고 할 수 있다.
아래 코드는 인터럽트를 사용하여 안전하게 종료하는 코드
private static volatile boolean running = true;
Runnable producer = () -> {
int i = 0;
try {
while (!Thread.currentThread().isInterrupted()) {
queue.put(i);
System.out.println("생성자: " + i + " 추가");
i++;
Thread.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("생성자 종료 (Interrupted)");
}
};
조건문에 스레드 상태를 통해 종료하게끔 코드 수정
인터럽트 정책 #1 사용자가 요청한 경우 인터럽트를 사용하자
스레드 인터럽트 처리 정책은 필요하다. 인터럽트 처리 정책은 인터럽트 요청이 들어왔을 때, 해당 스레드가 인터럽트를 어떻게 처리할 것인지 대한 지침이다.
인터럽트 대비에 단일 연산으로 보호할 수 있는 범위가 어디 까지인지, 인터럽트가 발생했을 때 해당하는 인터럽트에 어떻게 재빠르게 대응할것인지 지침을 뜻한다.
더 자세히 시작하기 전에 명확하게 구분할 내용이 있다.
작업(task) vs 스레드 (실행단위) 인터럽트 구분
스레드는 실행을 담당하는 실행단위
작업은 실행될 코드의 논리적 단위
스레드 풀을 사용할 경우, 작업을 직접 실행하는 주체가 아니라 스레드 풀에서 제공한 스레드를 사용함.
즉 인터럽트를 발생시킬 때
스레드에 거는것인지, 작업을 중단시키는것인지 명확하게 해야한다.
아래 코드는 인터럽트가 작업과 스레드에 어떻게 영향을 미치는가에 대한 코드이다.
import java.util.concurrent.*;
public class InterruptExample {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2); // 스레드 풀 생성
Runnable task = () -> {
try {
while (!Thread.currentThread().isInterrupted()) { // 인터럽트 체크
System.out.println(Thread.currentThread().getName() + " 작업 실행 중...");
Thread.sleep(1000); // 작업 실행
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 작업 인터럽트 감지 후 종료.");
Thread.currentThread().interrupt(); // 상태 복구 (중요!)
}
};
Future<?> future = executor.submit(task); // 스레드 풀에서 실행
Thread.sleep(3000); // 3초 후 인터럽트 요청
System.out.println("작업을 취소합니다.");
future.cancel(true); // 실행 중인 작업에 인터럽트 요청
Thread.sleep(2000); // 기다림
executor.shutdownNow(); // 모든 스레드 종료 요청
}
}
1. 작업 (Runnable task) 은 스레드 풀의 스레드에서 실행됨.
2. 스레드 풀에서 실행된 작업이 인터럽트를 감지하고 종료됨.
3. future.cancel(true)을 호출하면 현재 실행 중인 작업(task)에 인터럽트를 발생시킴.
4. 작업이 InterruptedException을 받으면, Thread.currentThread().interrupt()를 호출해서 스레드의 인터럽트 상태를 유지함.
5. 마지막에 executor.shutdownNow()로 스레드 풀 자체를 종료시킴.
• 작업에 대한 인터럽트
→ future.cancel(true)는 작업을 실행하는 스레드에 인터럽트를 보냄
→ 현재 실행 중인 작업이 취소됨
• 스레드 풀 자체에 대한 인터럽트
→ executor.shutdownNow()는 모든 스레드에 인터럽트를 보내고 종료
• 작업이 스레드를 직접 소유하지 않기 때문에, 스레드의 인터럽트 상태를 복구해야 함
→ Thread.currentThread().interrupt(); 사용
Thread.currentThread().interrupt(); // 🔥 인터럽트 상태 복구 (중요!)
인터럽트 상태를 복구하는 내용은 중요하다 그 이유는 다음과 같다
1. 작업은 Thread.sleep(1000)을 사용하므로, 인터럽트가 발생하면 InterruptedException이 발생함.
2. InterruptedException을 처리하는 순간, JVM이 해당 스레드의 인터럽트 상태를 자동으로 초기화(false로 변경)함.
3. 그런데 이 스레드는 스레드 풀에 의해 재사용되므로, 인터럽트 상태가 초기화되면 나중에 인터럽트를 감지할 수 없다
4. 따라서 Thread.currentThread().interrupt();를 호출해서 인터럽트 상태를 다시 설정해야 한다.
5. 그래야 스레드 풀이 해당 인터럽트 상태를 보고 추가적인 처리를 할 수 있음.
주의 *
각 스레드는 각자의 인터럽트 정책을 갖고 있다.
따라서 해당 스레드에서 인터럽트 요청을 받았을때 어떻게 동작할지 정확하게 알고 있지 않는 경우에는 함수로 인터럽트를 걸어서는 안된다.
인터럽트 정책 #2 실행 중간 필요없는 작업은 Future.get() 메서드를 활용해 취소하자
Future의 get() 인터럽트 정책
1. future.get(timeout, unit)을 사용하면 시간 내 작업이 끝나지 않으면 TimeoutException 발생한다
이후 future.cancel(true)을 호출 하면 작업이 실행중인 스레드에 인터럽트가 걸려 강제종료를 시도한다.
import java.util.concurrent.*;
public class FutureTimeoutExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
Callable<Integer> longRunningTask = () -> {
try {
System.out.println(Thread.currentThread().getName() + " - 작업 시작...");
Thread.sleep(5000); // 5초 동안 실행되는 작업
return 42; // 정상 종료 시 반환 값
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " - 인터럽트 발생! 작업 중단.");
return null; // 인터럽트 발생 시 null 반환
}
};
Future<Integer> future = executor.submit(longRunningTask);
try {
System.out.println("메인 스레드: 2초 동안 작업을 기다립니다...");
Integer result = future.get(2, TimeUnit.SECONDS); // ⏳ 2초만 기다림 (TimeoutException 발생 가능)
System.out.println("작업 완료! 결과: " + result);
} catch (TimeoutException e) {
System.out.println("⏳ 시간 초과! 작업을 취소합니다...");
future.cancel(true); // 실행 중인 작업에 인터럽트 요청
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
인터럽트 정책 #4 인터럽트에 응답하지 않는 블록킹 작업 다루기
자바 라이브러리에 포함된 여러 블록킹 메서드는 대부분 인터럽트가 발생하는 즉시 멈추면서 interrputedException을 띄우도록
되어 있으며, 따라서 작업 중단 요청에 적정하게 대응 하는 작업을 쉽게 구현할 수 있다.
그러나 특정 블록킹 메서드가 인터럽트에 대응하지 않게 되었는데
여기서는 응답하지 않는 블록킹 메서드 라고 칭하면 해당 메서드는 다음과 같다
- 패키지의 동기적 소켓 I/O : 서버 애플리케이션에서 가장 대표적인 블록킹 I/O 의 예는 소켓에서
데이터를 읽어오거나, 데이터를 쓰는 부분이다.
InputStream 의 read 메서드, OutputStream 의 write 메서드는 인터럽트에 반응하지 않는다
하지만 해당 스트림의 연결된 소켓을 직접 닫으면 대기중이던 read, write 메서드가 중단되면서 SocketException이 발생한다. - 패키지와 동기적 I/O : InterrputibleChannel 에서 대기하던 스레드에 인터럽트를 걸면 CloseByInterrputException이 발생하면서 해당 채널이 닫힌다.
채널이 닫히면 작업을 실행하던 스레드에서 AsynronousCloseException 이 발생한다. (대부분 표준 채널은 모두 InterrputibleException을 구현한다.) - Selector를 사용한 비동기적 I/O : 스레드가 Selector 클래스의 select 메서드에서 대기중인 경우, close 메서드를 호출하면 CloseSelectorException을 발생시키면서 즉시 리턴한다.
- 락 확보 : 스레드가 암묵적인 락을 확보하기 위해 대기상태에 들어가 있는 경우 언젠가 락을 확보할 수 있을것이라는 보장 하지 못하며, 어떤 방법으로도 다음 상태로 진행시켜 스레드의 주의를 끌 수 없기 때문에 이렇게 해 볼 방법이 없다. 하지만 Lock 인터페이스를 구현한 락 클래스의 lockInterrputibly 메서드를 활용하면 락을 확보할 때 까지 대기하면서 인터럽트에도 응답하도록 구현할 수 있다.
인터럽트에 응답하지 않는 블록킹 소켓 처리
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class InterruptibleSocketTask extends Thread {
private ServerSocket serverSocket;
public InterruptibleSocketTask(int port) throws IOException {
this.serverSocket = new ServerSocket(port);
}
@Override
public void run() {
try {
System.out.println("🚀 서버가 클라이언트 연결을 기다리는 중...");
Socket socket = serverSocket.accept(); // ❌ 블로킹 (클라이언트 연결 대기)
System.out.println("✅ 클라이언트 연결됨: " + socket);
// 클라이언트와의 I/O 작업 (예: read(), write())
socket.getInputStream().read(); // ❌ 블로킹 (데이터 읽기 대기)
} catch (IOException e) {
System.out.println("⛔ 서버 소켓이 닫혀서 accept()가 중단됨: " + e.getMessage());
}
}
@Override
public void interrupt() {
try {
System.out.println("🔴 서버 소켓 종료 요청!");
serverSocket.close(); // ✅ 소켓 닫아서 블로킹 해제
} catch (IOException e) {
e.printStackTrace();
}
super.interrupt(); // 부모의 interrupt() 호출하여 스레드 상태도 변경
}
public static void main(String[] args) throws IOException, InterruptedException {
InterruptibleSocketTask serverTask = new InterruptibleSocketTask(8080);
serverTask.start();
Thread.sleep(3000); // 3초 후 서버 중단 시도
serverTask.interrupt(); // ✅ 안전한 종료 요청
}
}
인터럽트 정책 #5 비정상적인 스레드 종료 상황 처리
비정상적으로 스레드가 종료 시 스택 트레이스를 콘솔에 출력하는 경우가 있겠지만, 아무도 콘솔을 모니터링 하지 않는 경우도 있고, 오류 때문에 스레드가 멈춘 경우에도 애플리케이션은 마치 오류 없이 동작하는 것 처럼 보일 수 있다.
이번 챕터는 스레드에서 오류가 발생하는 발행해 멈추지 않도록 예방할 수 있고, 멈춘 스레드를 찾아내는 방법이 있다.
스레드를 예상치 못하게 종료하는 가장 큰 원인은 RuntimeExceptoin이다.
작업 처리하는 스레드는 실행할 작업에 대해서 try - catch 구문 내부에서 실행해 예상치 못한 예외 상황에 대응할 수 있도록 준비하거나 try-finllay 구문에서 스레드가 피치 못할 사정으로 종료되는 경우에 외부에 종료된다는 사실을 알려야 한다.
try-catch 구문으로 예외 전파
public void run() {
Throwable thrown = null;
try {
while (!isInterrupted()) {
runTask(getTaskFromWorkQueue());
}
} catch (Throwable e) {
thrown = e;
} finally {
threadExited(this, thrown);
}
}
ThreadPollExecutor 클래스 afterExecute 메서드 오버라이드 후 오류 상황 전파
import java.util.concurrent.*;
public class ExceptionTrackingExecutor extends ThreadPoolExecutor {
public ExceptionTrackingExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t); // 기본 동작 유지
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone()) {
future.get(); // 예외 발생 여부 확인
}
} catch (CancellationException e) {
t = e; // 작업이 취소됨
} catch (ExecutionException e) {
t = e.getCause(); // 실행 중 예외 발생
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 인터럽트 상태 복구
}
}
// ✅ 예외 발생 시 로깅
if (t != null) {
System.err.println("⛔ 작업 중 예외 발생: " + t.getMessage());
t.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService executor = new ExceptionTrackingExecutor(2, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
// ✅ 정상 작업
executor.submit(() -> {
System.out.println("🚀 정상 작업 실행 중...");
});
// ✅ 예외 발생하는 작업
executor.submit(() -> {
throw new RuntimeException("💥 작업 실패!");
});
executor.shutdown();
}
}
마치며
작업, 스레드, 서비스, 애플리케이션 등이 할 일을 모두 마치고 종료되는 시점을 적절하게 관리하면 프로그램이 훨씬 복잡해진다.
자바에서는 선점적으로 작업을 중단하거나 스레드를 종료시킬 수 있는 방법을 제공하지 않는다. 그 대신 인터럽트라는 방법을 사용해 스레드 간의 협력 과정을 걸쳐 작업 중단 기능을 구현하도록 하고 있으며, 작업 중단 기능을 구현하고 전체 프로그램에 일관적으로 적용하는 일은 모두 개발자의 몫이다. FutureTask, Executor 등의 프레임워크를 사용하면 작업이나, 서비스를 실행 도중에 중단할 수 있는 기능을 쉽게 구현할 수 있을것이다.
'it 서적 독후감 > 자바 병렬 프로그래밍' 카테고리의 다른 글
| 자바 ExcutorService 프레임웍의 ExcutorCompletionService 활용 (0) | 2025.03.08 |
|---|---|
| Java 병렬프로그래밍 chap 1 (4) | 2025.02.24 |