Channel
개요
Channel은 CSP(Communicating Sequential Processes) 스타일의 스레드 간 메시지 전달을 제공합니다. Bounded(제한 용량) 및 Unbounded(무제한) 채널, 멀티 채널 select를 지원하며, Mutex/Condvar로 구현됩니다.
Quick Start
U std/channel
F main() -> i64 {
ch := channel_new(10)
ch.send(42)
value := ch.recv()
print_i64(value) # 42
R 0
}
API 요약
| 함수 | 설명 |
|---|---|
channel_new(capacity) | Bounded 채널 생성 |
unbounded_channel_new() | Unbounded 채널 생성 |
send(value) | 메시지 전송 (블로킹) |
recv() | 메시지 수신 (블로킹) |
try_send(value) | 논블로킹 전송 (실패 시 0 반환) |
try_recv() | 논블로킹 수신 (없으면 0 반환) |
channel_select(chans, count) | 여러 채널 중 하나 선택 |
close() | 채널 닫기 |
실용 예제
예제 1: Producer-Consumer 패턴
U std/channel
U std/thread
F producer(ch_ptr: i64) -> i64 {
sender := load_typed(ch_ptr)
i := 0
L i < 10 {
sender.send(i)
print_str("생산: ~{i}")
thread_sleep(100)
i = i + 1
}
sender.send(-1) # 종료 신호
R 0
}
F consumer(ch_ptr: i64) -> i64 {
receiver := load_typed(ch_ptr)
L 1 {
value := receiver.recv()
I value < 0 { B } # 종료
print_str("소비: ~{value}")
thread_sleep(200)
}
R 0
}
F main() -> i64 {
ch := channel_new(5)
thread_spawn(producer, &ch.sender)
thread_spawn(consumer, &ch.receiver)
thread_sleep(3000)
R 0
}
예제 2: 작업 큐 (Work Queue)
U std/channel
U std/thread
U std/vec
S Task {
id: i64,
data: i64
}
F worker(ch_ptr: i64, worker_id: i64) -> i64 {
receiver := load_typed(ch_ptr)
L 1 {
task := receiver.recv()
I task.id == 0 { B } # 종료 신호
print_str("워커 ~{worker_id} 작업 ~{task.id}")
thread_sleep(500) # 작업 시뮬레이션
}
R 0
}
F main() -> i64 {
ch := channel_new(20)
# 3개 워커 스레드 생성
i := 0
L i < 3 {
thread_spawn(worker, &ch.receiver, i)
i = i + 1
}
# 10개 작업 전송
i = 1
L i <= 10 {
task := Task { id: i, data: i * 100 }
ch.sender.send(task)
i = i + 1
}
# 종료 신호 (워커 수만큼)
i = 0
L i < 3 {
ch.sender.send(Task { id: 0, data: 0 })
i = i + 1
}
thread_sleep(5000)
R 0
}
예제 3: 채널 Select (멀티플렉싱)
U std/channel
F main() -> i64 {
ch1 := channel_new(1)
ch2 := channel_new(1)
# 두 채널에 데이터 전송
spawn {
thread_sleep(100)
ch1.sender.send(10)
}
spawn {
thread_sleep(200)
ch2.sender.send(20)
}
# 먼저 도착하는 메시지 수신
channels := Vec::new()
channels.push(&ch1.receiver)
channels.push(&ch2.receiver)
idx := channel_select(channels.data, 2)
I idx == 0 {
print_str("ch1 수신: ~{ch1.receiver.recv()}")
} E I idx == 1 {
print_str("ch2 수신: ~{ch2.receiver.recv()}")
}
R 0
}
예제 4: Unbounded 채널 (동적 크기)
U std/channel
F fast_producer(ch_ptr: i64) -> i64 {
sender := load_typed(ch_ptr)
i := 0
L i < 1000 {
sender.send(i) # 버퍼 full 걱정 없음
i = i + 1
}
R 0
}
F slow_consumer(ch_ptr: i64) -> i64 {
receiver := load_typed(ch_ptr)
i := 0
L i < 1000 {
value := receiver.recv()
thread_sleep(10) # 느린 처리
i = i + 1
}
R 0
}
F main() -> i64 {
ch := unbounded_channel_new()
thread_spawn(fast_producer, &ch.sender)
thread_spawn(slow_consumer, &ch.receiver)
thread_sleep(15000)
R 0
}
예제 5: 논블로킹 송수신
U std/channel
F main() -> i64 {
ch := channel_new(2)
# 논블로킹 전송
success := ch.sender.try_send(10)
print_i64(success) # 1 (성공)
ch.sender.try_send(20)
ch.sender.try_send(30) # 버퍼 full (capacity=2)
success = ch.sender.try_send(40)
print_i64(success) # 0 (실패)
# 논블로킹 수신
value := ch.receiver.try_recv()
I value != 0 {
print_i64(value) # 10
}
R 0
}
주의사항
1. Deadlock 위험
모든 Sender가 닫히지 않으면 Receiver가 영원히 블로킹됩니다.
# Deadlock 발생
ch := channel_new(1)
value := ch.receiver.recv() # 영원히 대기 (아무도 send 안 함)
# 해결: 타임아웃 또는 별도 스레드에서 send
spawn { ch.sender.send(42) }
value = ch.receiver.recv()
2. Bounded vs Unbounded
- Bounded: 고정 용량, 버퍼 full 시 sender 블로킹, 메모리 예측 가능
- Unbounded: 무제한 용량, sender 절대 블로킹, 메모리 폭발 위험
# Bounded: 백프레셔(Backpressure) 제공
ch := channel_new(10)
L i < 1000 {
ch.send(i) # 버퍼 full이면 대기 (consumer가 따라잡을 때까지)
}
# Unbounded: 메모리 누수 위험
ch := unbounded_channel_new()
L i < 1000000 {
ch.send(i) # 계속 누적 (consumer가 느리면 메모리 부족)
}
3. 채널 닫기
close()를 호출하면 더 이상 send할 수 없습니다. Receiver는 남은 메시지를 모두 수신 후 0 반환합니다.
ch := channel_new(5)
ch.send(1)
ch.send(2)
ch.close()
# ch.send(3) # 패닉 또는 무시됨
print_i64(ch.recv()) # 1
print_i64(ch.recv()) # 2
print_i64(ch.recv()) # 0 (채널 닫힘)
4. Select의 공정성
channel_select()는 첫 번째 준비된 채널을 선택합니다. 여러 채널이 동시에 준비되면 인덱스 순서대로 우선합니다.
# ch1과 ch2 모두 준비 → ch1 선택됨 (인덱스 0)
idx := channel_select(&[ch1, ch2], 2)
5. 메모리 관리
Channel 내부 버퍼는 동적 할당됩니다. 프로그램 종료 전 free()를 호출하거나 GC에 의존하세요.
6. MPMC (Multiple Producer Multiple Consumer)
Vais Channel은 MPMC를 지원합니다. 여러 스레드가 동시에 send/recv 가능합니다.
ch := channel_new(10)
# 3개 producer
L i < 3 {
spawn { L j < 100 { ch.send(j) } }
}
# 2개 consumer
L i < 2 {
spawn { L j < 150 { ch.recv() } }
}
7. 타임아웃 패턴
현재 구현은 타임아웃 내장 함수가 없습니다. Select와 timer 채널을 조합하세요.
F recv_with_timeout(ch: &Receiver<i64>, ms: i64) -> i64? {
timer := channel_new(1)
spawn {
thread_sleep(ms)
timer.send(0) # 타임아웃 신호
}
idx := channel_select(&[ch, &timer.receiver], 2)
I idx == 0 {
R Some(ch.recv())
} E {
R None # 타임아웃
}
}
8. 채널 용량 0 (Rendezvous)
Capacity=0인 채널은 Sender와 Receiver가 동시에 만날 때만 전송됩니다.
ch := channel_new(0) # Rendezvous channel
spawn { ch.send(42) } # Receiver 대기
value := ch.recv() # Sender와 동기화