<НА ГЛАВНУЮ

От нуля до чат-бота: модульный Conversational AI с Pipecat и HuggingFace

Пошаговое руководство по созданию модульного conversational AI-агента на Pipecat с интеграцией HuggingFace DialoGPT и полными примерами кода.

Обзор

В этом руководстве показано, как создать модульного conversational AI-агента с помощью фреймворка Pipecat и текст-генерации HuggingFace. Пример демонстрирует, как связать кастомные FrameProcessor-классы в Pipeline, симулировать ввод пользователя, запускать асинхронный поток данных с PipelineRunner и PipelineTask и отображать диалог. В примере используется DialoGPT-small для демонстрации, а архитектура Pipecat упрощает добавление функций вроде голосового ввода или памяти.

Установка и импорт

Сначала установите необходимые библиотеки и импортируйте компоненты Pipecat и HuggingFace. Точный код настройки приведён ниже:

!pip install -q pipecat-ai transformers torch accelerate numpy
 
 
import asyncio
import logging
from typing import AsyncGenerator
import numpy as np
 
 
print(" Checking available Pipecat frames...")
 
 
try:
   from pipecat.frames.frames import (
       Frame,
       TextFrame,
   )
   print(" Basic frames imported successfully")
except ImportError as e:
   print(f"  Import error: {e}")
   from pipecat.frames.frames import Frame, TextFrame
 
 
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
 
 
from transformers import pipeline as hf_pipeline
import torch

Этот код загружает основные элементы Pipecat (Pipeline, PipelineRunner, PipelineTask, FrameProcessor) и HuggingFace pipeline для генерации текста. После этого можно реализовать процессоры для обработки текста и отображения.

SimpleChatProcessor: генерация ответов моделью

SimpleChatProcessor загружает модель HuggingFace (DialoGPT-small) и хранит простую историю разговора в строке, чтобы обеспечить контекст между ходами. При получении TextFrame с пользовательским вводом он формирует входную строку, вызывает HuggingFace pipeline, очищает ответ модели, обновляет историю и отправляет TextFrame с ответом AI дальше по конвейеру.

Реализация класса в примере:

class SimpleChatProcessor(FrameProcessor):
   """Simple conversational AI processor using HuggingFace"""
   def __init__(self):
       super().__init__()
       print(" Loading HuggingFace text generation model...")
       self.chatbot = hf_pipeline(
           "text-generation",
           model="microsoft/DialoGPT-small",
           pad_token_id=50256,
           do_sample=True,
           temperature=0.8,
           max_length=100
       )
       self.conversation_history = ""
       print(" Chat model loaded successfully!")
 
 
   async def process_frame(self, frame: Frame, direction: FrameDirection):
       await super().process_frame(frame, direction)
       if isinstance(frame, TextFrame):
           user_text = getattr(frame, "text", "").strip()
           if user_text and not user_text.startswith("AI:"):
               print(f" USER: {user_text}")
               try:
                   if self.conversation_history:
                       input_text = f"{self.conversation_history} User: {user_text} Bot:"
                   else:
                       input_text = f"User: {user_text} Bot:"
 
 
                   response = self.chatbot(
                       input_text,
                       max_new_tokens=50,
                       num_return_sequences=1,
                       temperature=0.7,
                       do_sample=True,
                       pad_token_id=self.chatbot.tokenizer.eos_token_id
                   )
 
 
                   generated_text = response[0]["generated_text"]
                   if "Bot:" in generated_text:
                       ai_response = generated_text.split("Bot:")[-1].strip()
                       ai_response = ai_response.split("User:")[0].strip()
                       if not ai_response:
                           ai_response = "That's interesting! Tell me more."
                   else:
                       ai_response = "I'd love to hear more about that!"
 
 
                   self.conversation_history = f"{input_text} {ai_response}"
                   await self.push_frame(TextFrame(text=f"AI: {ai_response}"), direction)
               except Exception as e:
                   print(f"  Chat error: {e}")
                   await self.push_frame(
                       TextFrame(text="AI: I'm having trouble processing that. Could you try rephrasing?"),
                       direction
                   )
       else:
           await self.push_frame(frame, direction)

Этот процессор показывает практический подход к поддержанию контекста и постобработке вывода модели для получения читаемого ответа.

TextDisplayProcessor и ConversationInputGenerator

TextDisplayProcessor форматирует и печатает ответы AI и считает число завершённых обменов. ConversationInputGenerator генерирует серию демонстрационных TextFrame-сообщений с задержками, чтобы имитировать пользователя. Код модулей ниже:

class TextDisplayProcessor(FrameProcessor):
   """Displays text frames in a conversational format"""
   def __init__(self):
       super().__init__()
       self.conversation_count = 0
 
 
   async def process_frame(self, frame: Frame, direction: FrameDirection):
       await super().process_frame(frame, direction)
       if isinstance(frame, TextFrame):
           text = getattr(frame, "text", "")
           if text.startswith("AI:"):
               print(f" {text}")
               self.conversation_count += 1
               print(f"     Exchange {self.conversation_count} complete\n")
       await self.push_frame(frame, direction)
 
 
 
class ConversationInputGenerator:
   """Generates demo conversation inputs"""
   def __init__(self):
       self.demo_conversations = [
           "Hello! How are you doing today?",
           "What's your favorite thing to talk about?",
           "Can you tell me something interesting about AI?",
           "What makes conversation enjoyable for you?",
           "Thanks for the great chat!"
       ]
 
 
   async def generate_conversation(self) -> AsyncGenerator[TextFrame, None]:
       print(" Starting conversation simulation...\n")
       for i, user_input in enumerate(self.demo_conversations):
           yield TextFrame(text=user_input)
           if i < len(self.demo_conversations) - 1:
               await asyncio.sleep(2)

Эти компоненты отделяют логику отображения и генерации ввода от инференса модели, что облегчает тестирование и расширение.

Сборка агента и запуск демонстрации

SimpleAIAgent объединяет чат-процессор и дисплей-процессор с генератором ввода, создаёт Pipeline и запускает его асинхронно с помощью PipelineRunner и PipelineTask. Производитель кадров ставит кадры в очередь и сигнализирует об окончании. Код ниже:

class SimpleAIAgent:
   """Simple conversational AI agent using Pipecat"""
   def __init__(self):
       self.chat_processor = SimpleChatProcessor()
       self.display_processor = TextDisplayProcessor()
       self.input_generator = ConversationInputGenerator()
 
 
   def create_pipeline(self) -> Pipeline:
       return Pipeline([self.chat_processor, self.display_processor])
 
 
   async def run_demo(self):
       print(" Simple Pipecat AI Agent Demo")
       print(" Conversational AI with HuggingFace")
       print("=" * 50)
 
 
       pipeline = self.create_pipeline()
       runner = PipelineRunner()
       task = PipelineTask(pipeline)
 
 
       async def produce_frames():
           async for frame in self.input_generator.generate_conversation():
               await task.queue_frame(frame)
           await task.stop_when_done()
 
 
       try:
           print(" Running conversation demo...\n")
           await asyncio.gather(
               runner.run(task),     
               produce_frames(),    
           )
       except Exception as e:
           print(f" Demo error: {e}")
           logging.error(f"Pipeline error: {e}")
 
 
       print(" Demo completed successfully!")

Главная функция и детекция окружения

В конце примера определена main-корутина, которая инициализирует агента, запускает демонстрацию, печатает сводку и определяет, выполняется ли код в Google Colab. Затем вызывается await main() для запуска пайплайна. Точный код был приведён ранее и включён в пример.

Зачем такой подход

Разделение ответственности — ввод, инференс и отображение — позволяет легко заменять модели, добавлять голосовые модули, подключать хранилище контекста или развёртывать конвейер как веб-сервис. Pipecat обеспечивает гибкую основу для последовательного расширения без перестройки основной логики обработки.

🇬🇧

Switch Language

Read this article in English

Switch to English