<НА ГЛАВНУЮ

Агентный планировщик для wet-lab: разбор протоколов, расписание и безопасность с Salesforce CodeGen

'Руководство по построению автономного планировщика wet-lab протоколов на Python с использованием Salesforce CodeGen для парсинга, проверки инвентаря, расписания и проверки безопасности.'

Общая идея

Этот туториал показывает, как создать автономный Планировщик и Валидатор wet-lab протоколов на Python с использованием модели Salesforce CodeGen-350M-mono. Система преобразует свободно написанные протоколы в структурированные шаги, проверяет наличие и годность реактивов, формирует оптимальные расписания с предложениями по параллельному выполнению и помечает возможные биобезопасные или химические риски. Архитектура модульная: парсер, менеджер инвентаря, планировщик, валидатор безопасности и модуль LLM для предложений по оптимизации.

Загрузка модели CodeGen

Загружаем локальную версию CodeGen для inference без API. Используем float16 и автоматическое распределение устройств для ускорения на GPU.

import re, json, pandas as pd
from datetime import datetime, timedelta
from collections import defaultdict
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
 
 
MODEL_NAME = "Salesforce/codegen-350M-mono"
print("Loading CodeGen model (30 seconds)...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token
model = AutoModelForCausalLM.from_pretrained(
   MODEL_NAME, torch_dtype=torch.float16, device_map="auto"
)
print("✓ Model loaded!")

Разбор протокола

ProtocolParser извлекает нумерованные шаги из текста протокола и определяет длительность, температуру и метки безопасности. Внутри есть эвристики для 'overnight', разных форм записи температур и распространённых предупреждений.

class ProtocolParser:
   def read_protocol(self, text):
       steps = []
       lines = text.split('\n')
       for i, line in enumerate(lines, 1):
           step_match = re.search(r'^(\d+)\.\s+(.+)', line.strip())
           if step_match:
               num, name = step_match.groups()
               context = '\n'.join(lines[i:min(i+4, len(lines))])
               duration = self._extract_duration(context)
               temp = self._extract_temp(context)
               safety = self._check_safety(context)
               steps.append({
                   'step': int(num), 'name': name, 'duration_min': duration,
                   'temp': temp, 'safety': safety, 'line': i, 'details': context[:200]
               })
       return steps
  
   def _extract_duration(self, text):
       text = text.lower()
       if 'overnight' in text: return 720
       match = re.search(r'(\d+)\s*(?:hour|hr|h)(?:s)?(?!\w)', text)
       if match: return int(match.group(1)) * 60
       match = re.search(r'(\d+)\s*(?:min|minute)(?:s)?', text)
       if match: return int(match.group(1))
       match = re.search(r'(\d+)-(\d+)\s*(?:min|minute)', text)
       if match: return (int(match.group(1)) + int(match.group(2))) // 2
       return 30
  
   def _extract_temp(self, text):
       text = text.lower()
       if '4°c' in text or '4 °c' in text or '4°' in text: return '4C'
       if '37°c' in text or '37 °c' in text: return '37C'
       if '-20°c' in text or '-80°c' in text: return 'FREEZER'
       if 'room temp' in text or 'rt' in text or 'ambient' in text: return 'RT'
       return 'RT'
  
   def _check_safety(self, text):
       flags = []
       text_lower = text.lower()
       if re.search(r'bsl-[23]|biosafety', text_lower): flags.append('BSL-2/3')
       if re.search(r'caution|corrosive|hazard|toxic', text_lower): flags.append('HAZARD')
       if 'sharp' in text_lower or 'needle' in text_lower: flags.append('SHARPS')
       if 'dark' in text_lower or 'light-sensitive' in text_lower: flags.append('LIGHT-SENSITIVE')
       if 'flammable' in text_lower: flags.append('FLAMMABLE')
       return flags

Менеджер инвентаря

InventoryManager читает CSV с запасами, нормализует даты истечения и предоставляет функции извлечения упоминаний реактивов и проверки наличия, срока годности и низких остатков.

class InventoryManager:
   def __init__(self, csv_text):
       from io import StringIO
       self.df = pd.read_csv(StringIO(csv_text))
       self.df['expiry'] = pd.to_datetime(self.df['expiry'])
  
   def check_availability(self, reagent_list):
       issues = []
       for reagent in reagent_list:
           reagent_clean = reagent.lower().replace('_', ' ').replace('-', ' ')
           matches = self.df[self.df['reagent'].str.lower().str.contains(
               '|'.join(reagent_clean.split()[:2]), na=False, regex=True
           )]
           if matches.empty:
               issues.append(f" {reagent}: NOT IN INVENTORY")
           else:
               row = matches.iloc[0]
               if row['expiry'] < datetime.now():
                   issues.append(f"  {reagent}: EXPIRED on {row['expiry'].date()} (lot {row['lot']})")
               elif (row['expiry'] - datetime.now()).days < 30:
                   issues.append(f"  {reagent}: Expires soon ({row['expiry'].date()}, lot {row['lot']})")
               if row['quantity'] < 10:
                   issues.append(f"  {reagent}: LOW STOCK ({row['quantity']} {row['unit']} remaining)")
       return issues
  
   def extract_reagents(self, protocol_text):
       reagents = set()
       patterns = [
           r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\s+(?:antibody|buffer|solution)',
           r'\b([A-Z]{2,}(?:-[A-Z0-9]+)?)\b',
           r'(?:add|use|prepare|dilute)\s+([a-z\-]+\s*(?:antibody|buffer|substrate|solution))',
       ]
       for pattern in patterns:
           matches = re.findall(pattern, protocol_text, re.IGNORECASE)
           reagents.update(m.strip() for m in matches if len(m) > 2)
       return list(reagents)[:15]

Планирование и параллелизация

SchedulePlanner строит временную шкалу шагов, переносит очень долгие шаги на следующую дату, помечает шаги, которые можно выполнять параллельно, и предлагает простую оптимизацию для шагов с одинаковой температурой.

class SchedulePlanner:
   def make_schedule(self, steps, start_time="09:00"):
       schedule = []
       current = datetime.strptime(f"2025-01-01 {start_time}", "%Y-%m-%d %H:%M")
       day = 1
       for step in steps:
           end = current + timedelta(minutes=step['duration_min'])
           if step['duration_min'] > 480:
               day += 1
               current = datetime.strptime(f"2025-01-0{day} 09:00", "%Y-%m-%d %H:%M")
               end = current
           schedule.append({
               'step': step['step'], 'name': step['name'][:40],
               'start': current.strftime("%H:%M"), 'end': end.strftime("%H:%M"),
               'duration': step['duration_min'], 'temp': step['temp'],
               'day': day, 'can_parallelize': step['duration_min'] > 60,
               'safety': ', '.join(step['safety']) if step['safety'] else 'None'
           })
           if step['duration_min'] <= 480:
               current = end
       return schedule
  
   def optimize_parallelization(self, schedule):
       parallel_groups = []
       idle_time = 0
       for i, step in enumerate(schedule):
           if step['can_parallelize'] and i + 1 < len(schedule):
               next_step = schedule[i+1]
               if step['temp'] == next_step['temp']:
                   saved = min(step['duration'], next_step['duration'])
                   parallel_groups.append(
                       f" Steps {step['step']} & {next_step['step']} can overlap → Save {saved} min"
                   )
                   idle_time += saved
       return parallel_groups, idle_time

Валидатор безопасности

SafetyValidator реализует простые правила проверки pH и допустимых температур, отмечает потребность в биобезопасном шкафу и выдает рекомендации по СИЗ или особенностям обращения со слабо-совместимыми шагами.

class SafetyValidator:
   RULES = {
       'ph_range': (5.0, 11.0),
       'temp_limits': {'4C': (2, 8), '37C': (35, 39), 'RT': (20, 25)},
       'max_concurrent_instruments': 3,
   }
  
   def validate(self, steps):
       risks = []
       for step in steps:
           ph_match = re.search(r'ph\s*(\d+\.?\d*)', step['details'].lower())
           if ph_match:
               ph = float(ph_match.group(1))
               if not (self.RULES['ph_range'][0] <= ph <= self.RULES['ph_range'][1]):
                   risks.append(f"  Step {step['step']}: pH {ph} OUT OF SAFE RANGE")
           if 'BSL-2/3' in step['safety']:
               risks.append(f"  Step {step['step']}: BSL-2 cabinet REQUIRED")
           if 'HAZARD' in step['safety']:
               risks.append(f" Step {step['step']}: Full PPE + chemical hood REQUIRED")
           if 'SHARPS' in step['safety']:
               risks.append(f" Step {step['step']}: Sharps container + needle safety")
           if 'LIGHT-SENSITIVE' in step['safety']:
               risks.append(f" Step {step['step']}: Work in dark/amber tubes")
       return risks

Интеграция LLM и основной цикл агента

llm_call использует локально загруженную модель для генерации коротких советов по оптимизации. agent_loop связывает все модули: парсер, инвентарь, валидатор, планировщик и LLM-подсказку, возвращая полную структуру результатов.

def llm_call(prompt, max_tokens=200):
   try:
       inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512).to(model.device)
       outputs = model.generate(
           **inputs, max_new_tokens=max_tokens, do_sample=True,
           temperature=0.7, top_p=0.9, pad_token_id=tokenizer.eos_token_id
       )
       return tokenizer.decode(outputs[0], skip_special_tokens=True)[len(prompt):].strip()
   except:
       return "Batch similar temperature steps together. Pre-warm instruments."
 
 
def agent_loop(protocol_text, inventory_csv, start_time="09:00"):
   print("\n AGENT STARTING PROTOCOL ANALYSIS...\n")
   parser = ProtocolParser()
   steps = parser.read_protocol(protocol_text)
   print(f" Parsed {len(steps)} protocol steps")
   inventory = InventoryManager(inventory_csv)
   reagents = inventory.extract_reagents(protocol_text)
   print(f" Identified {len(reagents)} reagents: {', '.join(reagents[:5])}...")
   inv_issues = inventory.check_availability(reagents)
   validator = SafetyValidator()
   safety_risks = validator.validate(steps)
   planner = SchedulePlanner()
   schedule = planner.make_schedule(steps, start_time)
   parallel_opts, time_saved = planner.optimize_parallelization(schedule)
   total_time = sum(s['duration'] for s in schedule)
   optimized_time = total_time - time_saved
   opt_prompt = f"Protocol has {len(steps)} steps, {total_time} min total. Key bottleneck optimization:"
   optimization = llm_call(opt_prompt, max_tokens=80)
   return {
       'steps': steps, 'schedule': schedule, 'inventory_issues': inv_issues,
       'safety_risks': safety_risks, 'parallelization': parallel_opts,
       'time_saved': time_saved, 'total_time': total_time,
       'optimized_time': optimized_time, 'ai_optimization': optimization,
       'reagents': reagents
   }

Генерация вывода и пример запуска

Генераторы формируют Markdown-чеклист и CSV для Gantt, чтобы представить расписание, список реактивов и предупреждения. Ниже — пример запуска на ELISA-протоколе и тестовом инвентаре.

def generate_checklist(results):
   md = "#  WET-LAB PROTOCOL CHECKLIST\n\n"
   md += f"**Total Steps:** {len(results['schedule'])}\n"
   md += f"**Estimated Time:** {results['total_time']} min ({results['total_time']//60}h {results['total_time']%60}m)\n"
   md += f"**Optimized Time:** {results['optimized_time']} min (save {results['time_saved']} min)\n\n"
   md += "##  TIMELINE\n"
   current_day = 1
   for item in results['schedule']:
       if item['day'] > current_day:
           md += f"\n### Day {item['day']}\n"
           current_day = item['day']
       parallel = " " if item['can_parallelize'] else ""
       md += f"- [ ] **{item['start']}-{item['end']}** | Step {item['step']}: {item['name']} ({item['temp']}){parallel}\n"
   md += "\n##  REAGENT PICK-LIST\n"
   for reagent in results['reagents']:
       md += f"- [ ] {reagent}\n"
   md += "\n##  SAFETY & INVENTORY ALERTS\n"
   all_issues = results['safety_risks'] + results['inventory_issues']
   if all_issues:
       for risk in all_issues:
           md += f"- {risk}\n"
   else:
       md += "-  No critical issues detected\n"
   md += "\n##  OPTIMIZATION TIPS\n"
   for tip in results['parallelization']:
       md += f"- {tip}\n"
   md += f"-  AI Suggestion: {results['ai_optimization']}\n"
   return md
 
 
def generate_gantt_csv(schedule):
   df = pd.DataFrame(schedule)
   return df.to_csv(index=False)
SAMPLE_PROTOCOL = """ELISA Protocol for Cytokine Detection
 
 
1. Coating (Day 1, 4°C overnight)
  - Dilute capture antibody to 2 μg/mL in coating buffer (pH 9.6)
  - Add 100 μL per well to 96-well plate
  - Incubate at 4°C overnight (12-16 hours)
  - BSL-2 cabinet required
 
 
2. Blocking (Day 2)
  - Wash plate 3× with PBS-T (200 μL/well)
  - Add 200 μL blocking buffer (1% BSA in PBS)
  - Incubate 1 hour at room temperature
 
 
3. Sample Incubation
  - Wash 3× with PBS-T
  - Add 100 μL diluted samples/standards
  - Incubate 2 hours at room temperature
 
 
4. Detection Antibody
  - Wash 5× with PBS-T
  - Add 100 μL biotinylated detection antibody (0.5 μg/mL)
  - Incubate 1 hour at room temperature
 
 
5. Streptavidin-HRP
  - Wash 5× with PBS-T
  - Add 100 μL streptavidin-HRP (1:1000 dilution)
  - Incubate 30 minutes at room temperature
  - Work in dark
 
 
6. Development
  - Wash 7× with PBS-T
  - Add 100 μL TMB substrate
  - Incubate 10-15 minutes (monitor color development)
  - Add 50 μL stop solution (2M H2SO4) - CAUTION: corrosive
"""
 
 
SAMPLE_INVENTORY = """reagent,quantity,unit,expiry,lot
capture antibody,500,μg,2025-12-31,AB123
blocking buffer,500,mL,2025-11-30,BB456
PBS-T,1000,mL,2026-01-15,PT789
detection antibody,8,μg,2025-10-15,DA321
streptavidin HRP,10,mL,2025-12-01,SH654
TMB substrate,100,mL,2025-11-20,TM987
stop solution,250,mL,2026-03-01,SS147
BSA,100,g,2024-09-30,BS741"""
 
 
results = agent_loop(SAMPLE_PROTOCOL, SAMPLE_INVENTORY, start_time="09:00")
print("\n" + "="*70)
print(generate_checklist(results))
print("\n" + "="*70)
print("\n GANTT CSV (first 400 chars):\n")
print(generate_gantt_csv(results['schedule'])[:400])
print("\n Time Savings:", f"{results['time_saved']} minutes via parallelization")
🇬🇧

Switch Language

Read this article in English

Switch to English