There are two ways to run an AI agent: on a schedule ("every 15 minutes, check for new emails") or on an event ("when a payment succeeds, generate the invoice").
Scheduled agents are expensive and slow. They run whether or not anything happened, burning compute every cycle. They also lag — something could have happened 14 minutes ago and you won't know for 14 more.
Event-driven agents run only when triggered. They respond in seconds. The power is that they aren't doing anything — until something happens. Then they handle it instantly.
This post shows four patterns with working code.
The architecture
External event (Stripe, GitHub, Typeform, Slack)
→ Webhook fires to your endpoint
→ Endpoint receives, validates signature, returns 200 immediately
→ Background task: run the agent with the event payload
→ Agent calls tools, generates output
→ Output: email sent / PR comment posted / CRM updated / Linear issue created
The immediate 200 response is critical. Webhook providers (Stripe, GitHub) retry if they don't get a response within 5–30 seconds. Your agent might take 10 seconds. Return the 200 immediately, process asynchronously.
Setup: FastAPI webhook server
pip install fastapi uvicorn anthropic httpx redis
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
import hashlib, hmac, json, os
import redis
app = FastAPI()
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
def is_duplicate(event_id: str, ttl_seconds: int = 86400) -> bool:
"""Idempotency check — returns True if already processed."""
key = f"processed:{event_id}"
if r.get(key):
return True
r.setex(key, ttl_seconds, "1")
return False
Pattern 1: Stripe payment → invoice agent
When a payment succeeds, the agent generates a GST-compliant invoice and emails it to the customer.
import anthropic
from email.mime.text import MIMEText
import smtplib
client = anthropic.Anthropic()
def verify_stripe_signature(payload: bytes, sig_header: str, secret: str) -> bool:
"""Verify Stripe webhook signature."""
timestamp = sig_header.split(",")[0].split("=")[1]
sig = sig_header.split(",")[1].split("=")[1]
signed_payload = f"{timestamp}.{payload.decode()}"
expected = hmac.new(secret.encode(), signed_payload.encode(), hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, sig)
async def process_stripe_payment(payload: dict):
intent = payload.get("data", {}).get("object", {})
customer_email = intent.get("receipt_email") or intent.get("metadata", {}).get("email")
amount_paise = intent["amount"] # Stripe uses smallest currency unit
currency = intent.get("currency", "inr").upper()
payment_id = intent["id"]
if is_duplicate(payment_id):
return
# Generate invoice with Claude
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=800,
messages=[{
"role": "user",
"content": f"""Generate a professional GST invoice email body for this payment.
Payment ID: {payment_id}
Amount: ₹{amount_paise / 100:.2f}
Customer email: {customer_email}
Date: {__import__('datetime').date.today().strftime('%d %B %Y')}
Business: {os.environ.get('BUSINESS_NAME', 'Your Business')}
GSTIN: {os.environ.get('BUSINESS_GSTIN', 'N/A')}
Write an invoice email (HTML) with:
- Formal header with invoice number (INV-{payment_id[-8:].upper()})
- Payment confirmation with amount
- GST breakdown (18% GST on service)
- Thank you note
Keep it professional and concise."""
}],
)
invoice_html = response.content[0].text
# Send email (simplified — use SendGrid/SES in production)
msg = MIMEText(invoice_html, "html")
msg["Subject"] = f"Invoice INV-{payment_id[-8:].upper()} - Payment Confirmed"
msg["From"] = os.environ["FROM_EMAIL"]
msg["To"] = customer_email
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp:
smtp.login(os.environ["FROM_EMAIL"], os.environ["EMAIL_PASSWORD"])
smtp.send_message(msg)
print(f"Invoice sent for payment {payment_id}")
@app.post("/webhooks/stripe")
async def stripe_webhook(request: Request, background_tasks: BackgroundTasks):
payload = await request.body()
sig = request.headers.get("stripe-signature", "")
if not verify_stripe_signature(payload, sig, os.environ["STRIPE_WEBHOOK_SECRET"]):
raise HTTPException(status_code=400, detail="Invalid signature")
event = json.loads(payload)
if event["type"] == "payment_intent.succeeded":
background_tasks.add_task(process_stripe_payment, event)
return {"status": "received"} # Return 200 immediately
Pattern 2: GitHub PR → review agent
When a PR is opened, the agent fetches the diff and posts a review comment.
def verify_github_signature(payload: bytes, sig_header: str, secret: str) -> bool:
expected = "sha256=" + hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, sig_header)
async def process_pr_opened(payload: dict):
pr = payload["pull_request"]
pr_number = pr["number"]
repo = payload["repository"]["full_name"]
diff_url = pr["diff_url"]
event_id = f"pr-{repo}-{pr_number}-{pr['head']['sha']}"
if is_duplicate(event_id):
return
# Fetch the diff
import httpx
async with httpx.AsyncClient() as http:
diff_response = await http.get(
diff_url,
headers={"Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}"},
)
diff = diff_response.text[:8000]
# Generate review with Claude
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=600,
messages=[{
"role": "user",
"content": f"""Review this PR diff. Focus on:
- Security issues (SQL injection, XSS, hardcoded secrets)
- Missing error handling on external calls
- Obvious bugs or logic errors
- Performance issues (N+1 queries, missing indexes)
PR: {pr['title']}
Author: {pr['user']['login']}
Diff:
{diff}
Format: markdown. Use "### Findings" and "### Suggestions" sections.
If nothing critical: say so briefly. Be direct, not verbose."""
}],
)
comment = response.content[0].text
# Post GitHub comment
async with httpx.AsyncClient() as http:
await http.post(
f"https://api.github.com/repos/{repo}/issues/{pr_number}/comments",
headers={
"Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}",
"Accept": "application/vnd.github+json",
},
json={"body": f"**AI Review**\n\n{comment}\n\n_Automated review — not a substitute for human review._"},
)
@app.post("/webhooks/github")
async def github_webhook(request: Request, background_tasks: BackgroundTasks):
payload_bytes = await request.body()
sig = request.headers.get("x-hub-signature-256", "")
event_type = request.headers.get("x-github-event", "")
if not verify_github_signature(payload_bytes, sig, os.environ["GITHUB_WEBHOOK_SECRET"]):
raise HTTPException(status_code=400, detail="Invalid signature")
payload = json.loads(payload_bytes)
if event_type == "pull_request" and payload.get("action") == "opened":
background_tasks.add_task(process_pr_opened, payload)
return {"status": "received"}
Pattern 3: Form submission → CRM qualification agent
When a new Typeform lead comes in, the agent researches the company and creates a CRM deal if qualified.
async def process_form_submission(payload: dict):
form_response = payload["form_response"]
event_id = form_response["token"]
if is_duplicate(event_id):
return
# Extract answers (Typeform field IDs vary by form)
answers = {a["field"]["ref"]: a.get("text", a.get("email", a.get("choice", {}).get("label", "")))
for a in form_response["answers"]}
company_domain = answers.get("company_website", "").replace("https://", "").replace("www.", "").split("/")[0]
submitter_email = answers.get("email", "")
use_case = answers.get("use_case", "")
# Research the company with Firecrawl
import httpx
async with httpx.AsyncClient() as http:
scrape_r = await http.post(
"https://api.firecrawl.dev/v1/scrape",
headers={"Authorization": f"Bearer {os.environ['FIRECRAWL_API_KEY']}"},
json={"url": f"https://{company_domain}", "formats": ["markdown"]},
)
company_info = scrape_r.json().get("markdown", "")[:3000] if scrape_r.ok else ""
# Qualify with Claude
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=300,
messages=[{
"role": "user",
"content": f"""Qualify this sales lead.
Company domain: {company_domain}
Company info: {company_info[:2000]}
Use case described: {use_case}
Email: {submitter_email}
Return JSON:
{{
"qualified": true/false,
"company_size": "startup/smb/mid-market/enterprise",
"icp_fit": "high/medium/low",
"reason": "one sentence",
"outreach_angle": "one sentence personalized opener if qualified"
}}"""
}],
)
qual = json.loads(response.content[0].text)
if qual.get("qualified") and qual.get("icp_fit") in ("high", "medium"):
# Create HubSpot deal (or your CRM)
async with httpx.AsyncClient() as http:
await http.post(
"https://api.hubapi.com/crm/v3/objects/deals",
headers={"Authorization": f"Bearer {os.environ['HUBSPOT_API_KEY']}"},
json={
"properties": {
"dealname": f"{company_domain} — Inbound",
"dealstage": "appointmentscheduled",
"description": f"ICP fit: {qual['icp_fit']}\nUse case: {use_case}",
}
},
)
# Send personalized outreach draft to sales rep via Slack
async with httpx.AsyncClient() as http:
await http.post(
os.environ["SALES_SLACK_WEBHOOK"],
json={
"text": f"*New qualified lead*: {submitter_email} from {company_domain}\n"
f"ICP fit: {qual['icp_fit']} | Reason: {qual['reason']}\n"
f"Suggested opener: _{qual['outreach_angle']}_"
},
)
@app.post("/webhooks/typeform")
async def typeform_webhook(request: Request, background_tasks: BackgroundTasks):
payload = await request.json()
background_tasks.add_task(process_form_submission, payload)
return {"status": "received"}
Pattern 4: Slack message → Linear task agent
When someone posts a message containing "TODO" or "follow up" in a Slack channel, the agent extracts the task and creates a Linear issue.
The no-code path for this is easier in n8n (Slack trigger → Code node → Linear node). Here's the Python version for full control:
async def process_slack_message(payload: dict):
event = payload.get("event", {})
if event.get("type") != "message" or event.get("bot_id"):
return # Skip bot messages
text = event.get("text", "")
channel = event.get("channel")
user = event.get("user")
ts = event.get("ts")
event_id = f"slack-{channel}-{ts}"
keywords = ["TODO", "todo", "follow up", "follow-up", "action item", "action:"]
if not any(kw in text for kw in keywords):
return
if is_duplicate(event_id):
return
# Extract task details with Claude
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=200,
messages=[{
"role": "user",
"content": f"""Extract a task from this Slack message.
Message: {text}
Return JSON:
{{
"title": "concise task title (under 60 chars)",
"description": "full context for the task",
"assignee_mention": "@username if mentioned or null",
"due_date": "YYYY-MM-DD if mentioned or null",
"priority": "urgent/high/medium/low"
}}"""
}],
)
task = json.loads(response.content[0].text)
# Create Linear issue
import httpx
async with httpx.AsyncClient() as http:
await http.post(
"https://api.linear.app/graphql",
headers={"Authorization": os.environ["LINEAR_API_KEY"]},
json={
"query": """
mutation CreateIssue($input: IssueCreateInput!) {
issueCreate(input: $input) { issue { id identifier url } }
}
""",
"variables": {
"input": {
"teamId": os.environ["LINEAR_TEAM_ID"],
"title": task["title"],
"description": f"{task['description']}\n\nSource: Slack message from {user}",
"priority": {"urgent": 1, "high": 2, "medium": 3, "low": 4}.get(task["priority"], 3),
}
},
},
)
print(f"Linear issue created: {task['title']}")
@app.post("/webhooks/slack")
async def slack_webhook(request: Request, background_tasks: BackgroundTasks):
payload = await request.json()
# Slack URL verification challenge
if payload.get("type") == "url_verification":
return {"challenge": payload["challenge"]}
background_tasks.add_task(process_slack_message, payload)
return {"status": "received"}
Idempotency is not optional
Stripe sends 3 retries if it doesn't get a 200. GitHub sends 3 retries. Typeform sends 3 retries. Your agent will receive duplicate events. Without idempotency:
- Customer gets 3 invoices for the same payment
- 3 PR review comments appear on the same PR
- 3 Linear issues created from the same Slack message
The is_duplicate() function using Redis TTL handles this. The event ID is the key — use the provider's unique ID (Stripe: payment_intent.id, GitHub: the PR number + commit SHA, Slack: channel + timestamp).
Deploying
Local development: use ngrok to expose your local FastAPI server:
uvicorn main:app --port 8000 &
ngrok http 8000
# Use the ngrok URL as your webhook endpoint in Stripe/GitHub dashboards
Production: deploy to Railway, Fly.io, or a VPS. The webhook handlers are stateless (state lives in Redis), so horizontal scaling works out of the box.
# Railway: one command deployment
railway up
The n8n automation guide has the no-code equivalent of patterns 1 and 4 if you'd rather use n8n than write Python.



