Step-by-step guide to creating custom tasks and pipelines
A minimal guide to creating custom tasks and pipelines. You’ll build a two-step pipeline: the LLM extracts People directly, then you insert them into the knowledge graph.Before you start:
Complete Quickstart to understand basic operations
import asynciofrom typing import Any, Dict, Listfrom pydantic import BaseModel, SkipValidationimport cogneefrom cognee.modules.engine.operations.setup import setupfrom cognee.infrastructure.llm.LLMGateway import LLMGatewayfrom cognee.infrastructure.engine import DataPointfrom cognee.tasks.storage import add_data_pointsfrom cognee.modules.pipelines import Task, run_pipelineclass Person(DataPoint): name: str # Optional relationships (we'll let the LLM populate this) knows: List["Person"] = [] # Make names searchable in the vector store metadata: Dict[str, Any] = {"index_fields": ["name"]}class People(BaseModel): persons: List[Person]async def extract_people(text: str) -> List[Person]: system_prompt = ( "Extract people mentioned in the text. " "Return as `persons: Person[]` with each Person having `name` and optional `knows` relations. " "If the text says someone knows someone set `knows` accordingly. " "Only include facts explicitly stated." ) people = await LLMGateway.acreate_structured_output(text, system_prompt, People) return people.personsasync def main(): await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) await setup() text = "Alice knows Bob." tasks = [ Task(extract_people), # input: text -> output: list[Person] Task(add_data_points) # input: list[Person] -> output: list[Person] ] async for _ in run_pipeline(tasks=tasks, data=text, datasets=["people_demo"]): passif __name__ == "__main__": asyncio.run(main())
This simple example uses a two-step pipeline for demonstration. In practice, you can create complex pipelines with multiple custom tasks, data transformations, and processing steps.
Create Pydantic models for your data. Person inherits from DataPoint for graph insertion, while People is a simple container for the LLM output. Metadata is recommended to make fields searchable in the vector database.
async def extract_people(text: str) -> List[Person]: system_prompt = ( "Extract people mentioned in the text. " "Return as `persons: Person[]` with each Person having `name` and optional `knows` relations. " "If the text says someone knows someone set `knows` accordingly. " "Only include facts explicitly stated." ) people = await LLMGateway.acreate_structured_output(text, system_prompt, People) return people.persons
This task uses the LLM to extract structured data from text. The LLM fills People objects with Person instances, including relationships via the knows field.
acreate_structured_output is backend-agnostic (BAML or LiteLLM+Instructor). Configure via STRUCTURED_OUTPUT_FRAMEWORK in .env.
tasks = [ Task(extract_people), # input: text -> output: list[Person] Task(add_data_points) # input: list[Person] -> output: list[Person]]async for _ in run_pipeline(tasks=tasks, data=text, datasets=["people_demo"]): pass
Chain your tasks together in a pipeline. The first task extracts people from text, the second inserts them into the knowledge graph. add_data_points automatically creates nodes and edges from the knows relationships.Under the hood, run_pipeline(...) automatically initializes databases and checks LLM/embeddings configuration, so you don’t need to worry about setup. Once the pipeline completes, your Cognee memory with graph and embeddings is created and ready for interaction.You can now search your data using the standard search methods:
from cognee.api.v1.search import SearchType# Search the processed dataresults = await cognee.search( query_type=SearchType.GRAPH_COMPLETION, query_text="Who does Alice know?", datasets=["people_demo"])print(results)