RunnablePassthrough 이해하기

LCEL(LangChainExpressionLanguage) 을 공부하다 보면 RunnablePassgthrough라는 것이 종종 등장한다.

Passthrough라는 이름값에 맞게 입력 받은 (주로 runnable 객체에 invoke 메서드를 호출하여 invoke({“num” : 1}) 과 같이 딕셔너리를 입력으로 받는다.) 딕셔너리를 그대로 전달한다.

일반적으로 RunnableParaellel과 함께 사용된다.

데이터를 맵의 새로운 키에 할당하는데 사용된다고 한다.

from langchain_core.runnables import RunnableParallel, RunnablePassthrough

runnable = RunnableParallel( #병렬처리
    # 전달된 입력을 그대로 반환하는 Runnable을 설정합니다.
    passed=RunnablePassthrough(),
    # 입력의 "num" 값에 3을 곱한 결과를 반환하는 Runnable을 설정합니다.
    extra=RunnablePassthrough.assign(mult=lambda x: x["num"] * 3),
    # 입력의 "num" 값에 1을 더한 결과를 반환하는 Runnable을 설정합니다.
    modified=lambda x: x["num"] + 1,
)

# {"num": 1}을 입력으로 Runnable을 실행합니다.
runnable.invoke({"num": 1}) #딕셔너리가 그대로 전달

와 같이 되어있다면 RunnableParallel은 병렬로 실행 가능한 작업을 정의한다.

passed 속성에는 RunnablePassthrough 인스턴스를 할당하여 입력을 그대로 반환하고

extra 속성에는 RunnablePassthrough.assign() 메서드를 사용하여 입력의 “num”값에 3을 곱한 결과를 “mult”키에 할당한다.

근데 해당 코드를 보면 약간 의아한 부분이 있다.

왜 extra = RunnablePassthrough.assign(mult=lambda x: x[“num”] * 3) 라고 썼을까?

extra = passed.assign(mult = lambda x : x[“num”] * 3) 이 더 맞지 않나? 라는 의문이 들기도 한다.

이는 불가능한데 이유는 다음과 같다.

RunnableParallel은 딕셔너리에 정의된 모든 Runnable을 병렬로 실행하며, 각 Runnable은 동일한 원본 입력({“num” : 1})을 받기 때문이다.

즉, passed 결과가 extra의 입력으로 들어가는 것이 아니다.


분류RunnablePassthrough.assign(…)RunnablePassthrough()
방식RunnablePassthrough 클래스 자체에 정의된 assign이라는 클래스 메서드(혹은 정적 메서드)를 호출하는 것이다.

이 메서드의 역할은 입력(딕셔너리)을 그대로 통과시키면서 동시에 새로운 키-값 쌍을 계산하여 원본 딕셔너리에 추가(assign)하는 새로운 Runnable을 생성하는 것이다.
RunnblePassthrough 클래스의 인스턴스(객체)를 생성한다.

이 객체의 역할은 입력을 받아서 그대로 반환하는 것이다.

그래서 passed 브랜치는 입력받은 {“num” : 1}을 그대로 반환한다.

아직도 뭔가 모르겠다. 그럼 다음을 보자.

일단 RunnableParalle에 대한 이해가 필요하다 우선 속성값 passed , extra, modified 이름을 잘 지어놔서 마치 langchain 재단 소속 개발자가 개발 시점에서 정해놓은 파라미터 명인 것처럼 보이나. 전혀 아니다.

from langchain_core.runnables import RunnableParallel, RunnablePassthrough

runnable = RunnableParallel(
     # 입력 전체를 그대로 반환
     blahblah_A = RunnablePassthrough(),
     # 입력 전체를 그대로 반환
     blahblah_B = RunnablePassthrough(),
     # 입력에서 "num"을 찾아 3을 곱하고, 입력 전체에 'mult' 키를 추가
     out_A = RunnablePassthrough.assign(mult = lambda x : x["num"] * 3),
     # 입력에서 "str"을 찾아 길이를 재고, 입력 전체에 'str_len' 키를 추가
     out_B = RunnablePassthrough.assign(str_len = lambda  x : len(x["str"])),
)

우선 다음과 같은 코드를 짜보겠다.

# 'num'과 'str'을 모두 포함하는 *단일* 딕셔너리 입력
input_data = {"num": 1, "str": "abcdefg"}

# invoke 호출
result = runnable.invoke(input_data)

그리고 상기의 코드로 테스트 해본다.

다음과 같이 나온다.

보면 result 에 대한 결과는

{'blahblah_A': {'num': 1, 'str': 'abcdefg'}, 'blahblah_B': {'num': 1, 'str': 'abcdefg'}, 'out_A': {'num': 1, 'str': 'abcdefg', 'mult': 3}, 'out_B': {'num': 1, 'str': 'abcdefg', 'str_len': 7}}

와 같이 나오고 있는데

blahblah_A와 blahbalh_B에 다음과 같이 인수로 제공한 딕셔너리가 할당되고

out_A에는 RunnablePassthroug.assign(mult = lambda x : x[“num”] *3) 으로 되어 있으므로
‘out_A’: {‘num’: 1, ‘str’: ‘abcdefg’, ‘mult’: 3} 으로 나오고

out_B에는 RunnablePassthrough.assign(str_len = lambda x : len(x[“str”])) 으로 되어 있으므로

‘out_B’:{‘num’: 1, ‘str’: ‘abcedfg’, ‘str_len’ : 7} 으로 나온다.

즉, 안에 들어갈 인수명 따위는 개발자가 임의로 정해서 쓰는 것이다.

이걸 이해하면 편하다.


RunnablePassthrough를 안쓰고 구현하는 방법도 있다.

바로 딕셔너리 언패킹을 사용하는 것이다.



from langchain_core.runnables import RunnableParallel, RunnablePassthrough

runnable = RunnableParallel(
     blahblah_A = RunnablePassthrough(),
     blahblah_B = RunnablePassthrough(),
     
     # assign 대신 람다 함수로 직접 구현
     # "입력 딕셔너리 x의 모든 내용을 복사하고, 'mult' 키를 추가한다"
     out_A = lambda x: {**x, "mult": x["num"] * 3},
     
     # "입력 딕셔너리 x의 모든 내용을 복사하고, 'str_len' 키를 추가한다"
     out_B = lambda x: {**x, "str_len": len(x["str"])},
)

# 테스트
input_data = {"num": 1, "str": "abcdefg"}
print(runnable.invoke(input_data))

또 RunnalbeParallel에서 RunnablePassthough가 반드시 있어야 하는 것도 아니다.

branches = {}
for i in range(100):
    # 각 브랜치는 입력된 숫자에 i를 더함
    branches[f"branch_{i}"] = (lambda i_val: lambda x: x["num"] + i_val)(i)

와 같은 코드가 있다.

runnable_100 = RunnableParallel(branches)

이를 그대로 RunnableParallel에 때려 박아 넣고

runnable_100.invoke({"num": 1})

을 실행한다.

그럼 결과는 아래와 같다.

{'branch_0': 1,
 'branch_1': 2,
 'branch_2': 3,
 'branch_3': 4,
 'branch_4': 5,
 'branch_5': 6,
 'branch_6': 7,
-- 중략 --
 'branch_95': 96,
 'branch_96': 97,
 'branch_97': 98,
 'branch_98': 99,
 'branch_99': 100}

다른 숫자로 테스트해도 위와 같이 나온다.


이를 설명하려면 동시성 이라는 개념을 알 필요가 있다.

RunnableParallel : 100개의 주문서(작업)을 받은 총주방장 이다.

ThreadPoolExcecutor : 실제 요리를 하는 ‘주방 그 자체이다.

max_workers (최대 스레드) : 주방에서 일하는 ‘요리사의 수이다.

총주방장(RunnableParallel)이 100개의 주문을 받았다.. 하지만 주방에는 요리사(max_workers)가 5명뿐이다.

총주방장은 일단 5명의 요리사에게 주문 5개를 나눠준다. 요리사가 요리를 마치면(작업 완료), 총주방장은 그 요리사에게 6번째 주문을, 다른 요리사가 끝나면 7번째 주문을 준다.

이것이 RunnableParallel의 기본 작동 방식이다. 100개가 ‘이론상’ 동시에 시작되지만, ‘물리적’으로는 요리사 수(스레드 풀 크기)만큼만 동시에 처리된다.

따라서 이런 RunnableParaller이 빛을 발하는 분야는 API 호출, DB 조회이다.

이제 다음 예제를 따라해보자.


with open("yain.txt", "r", encoding="UTF-8") as f:
    yain_text = f.read()
    
print(yain_text)


from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# 텍스트로부터 FAISS 벡터 저장소를 생성합니다.
vectorstore = FAISS.from_texts(
    [yain_text,
    ],
    embedding=OpenAIEmbeddings(),
)
# 벡터 저장소를 검색기로 사용합니다.
retriever = vectorstore.as_retriever()
# 템플릿을 정의합니다.
template = """ 다음 
{context} 을 참고하여 다음 질문을 답변해세요 -> 

Question: {question}
"""
# 템플릿으로부터 채팅 프롬프트를 생성합니다.
prompt = ChatPromptTemplate.from_template(template)

# ChatOpenAI 모델을 초기화합니다.
model = ChatOpenAI(model_name="gpt-4o-mini")


# 문서를 포맷팅하는 함수
def format_docs(docs):
    return "\n".join([doc.page_content for doc in docs])


# 검색 체인을 구성합니다.
retrieval_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | model
    | StrOutputParser()
)

# 검색 체인을 실행하여 질문에 대한 답변을 얻습니다.
retrieval_chain.invoke("심영이 말하는 '님'은 무엇이고 그에 대한 김두한의 생각을 답하시오")

‘심영이 말하는 \’님\’은 사회주의 국가를 기리며 표현한 개념으로, 그리운 이름이자 수많은 사람들에게 희망과 이상을 상징하는 존재로 묘사됩니다. 연극 \’님\’에서 심영은 \’님\’을 사회주의 낙원으로 해석하며, 이념적이고 정치적인 메시지를 전달하려고 합니다. \n\n김두한의 생각은 이와 정반대입니다. 그는 공산당의 선전과 학생 및 시민들을 속이는 행위를 강하게 반대하며, 심영의 연극을 비판적으로 바라봅니다. 그는 직접적으로 무대에 나서서 심영을 향해 “개소리 집어치워!”라고 외치며, 공산당을 선전하는 내용이 허위이며 사람들을 우롱하고 있다고 강하게 반박합니다. 김두한은 사회주의를 지지하거나 동조하는 것이 아닌, 오히려 그에 대한 저항의 목소리를 높이고 있습니다.’

결과가 다음과 같이 나온다.

따라서 다음과 같이 값을 바로 전달할 수 있는 것이 RunnablePassthrough이다.