Transform Your Data with Aggregation
Overview
In this guide, you can learn how to use the Kotlin Sync driver to perform aggregation operations.
You can use aggregation operations to process data in your MongoDB collections and return computed results. The MongoDB Aggregation framework, which is part of the Query API, is modeled on the concept of a data processing pipeline. Documents enter a pipeline that contains one or more stages, and each stage transforms the documents to output a final aggregated result.
You can think of an aggregation operation as similar to a car factory. A car factory has an assembly line, which contains assembly stations with specialized tools to do specific jobs, like drills and welders. Raw parts enter the factory, and then the assembly line transforms and assembles them into a finished product.
The aggregation pipeline is the assembly line, aggregation stages are the assembly stations, and operator expressions are the specialized tools.
Compare Aggregation and Find Operations
You can use find operations to perform the following actions:
Select which documents to return
Select which fields to return
Sort the results
You can use aggregation operations to perform the following actions:
Perform find operations
Rename fields
Calculate fields
Summarize data
Group values
Limitations
The following limitations apply when using aggregation operations:
Returned documents must not violate the BSON document size limit of 16 megabytes.
Pipeline stages have a memory limit of 100 megabytes by default. You can exceed this limit by using the
allowDiskUse()
method fromAggregateIterable
class.
Important
$graphLookup exception
The $graphLookup stage has a strict
memory limit of 100 megabytes and ignores the allowDiskUse
option.
Aggregation Example
The examples in this section use the restaurants
collection in the sample_restaurants
database from the Atlas sample datasets. To learn how to create a
free MongoDB Atlas cluster and load the sample datasets, see the
Get Started with Atlas guide.
The following Kotlin data class models the documents in this collection:
data class Restaurant( val name: String, val cuisine: String, val borough: String )
Build and Execute an Aggregation Pipeline
To perform an aggregation on the documents in a collection, pass a list of aggregation
stages to the aggregate()
method.
This example outputs a count of the number of bakeries in each borough of New York City. The following code creates aggregation pipeline that contains the following stages:
A $match stage to filter for documents in which the value of the
cuisine
field is"Bakery"
.A $group stage to group the matching documents by the
borough
field, producing a count of documents for each distinct value of that field.
val pipeline = listOf( Aggregates.match(Filters.eq(Restaurant::cuisine.name, "Bakery")), Aggregates.group("\$borough", Accumulators.sum("count", 1)) ) val results = collection.aggregate<Document>(pipeline) results.forEach { result -> println(result) }
Document{{_id=Bronx, count=71}} Document{{_id=Manhattan, count=221}} Document{{_id=Brooklyn, count=173}} Document{{_id=Queens, count=204}} Document{{_id=Staten Island, count=20}} Document{{_id=Missing, count=2}}
Tip
When specifying a group key for the $group
aggregation stage, ensure that you
escape any $
characters by using the \
character.
Explain an Aggregation
To view information about how MongoDB executes your operation, you can
include the $explain
aggregation stage in your pipeline. When MongoDB explains an
operation, it returns execution plans and performance statistics. An execution
plan is a potential way MongoDB can complete an operation.
When you instruct MongoDB to explain an operation, it returns both the
plan MongoDB selected for the operation and any rejected execution plans.
The following code example runs the same aggregation shown in the preceding section
and adds the $explain
stage to output the operation details:
print(collection.aggregate(pipeline).explain())
{ "explainVersion": "2", "queryPlanner": { "namespace": "sample_restaurants.restaurants" "indexFilterSet": false, "parsedQuery": { "cuisine": {"$eq": "Bakery"} }, "queryHash": "865F14C3", "planCacheKey": "0697561B", "optimizedPipeline": true, "maxIndexedOrSolutionsReached": false, "maxIndexedAndSolutionsReached": false, "maxScansToExplodeReached": false, "winningPlan": { ... } ... } ... }
Atlas Search
You can perform an Atlas Search query by creating and running an aggregation pipeline that contains one of the following pipeline stages:
$search
$searchMeta
To learn more about Atlas Search pipeline stages, see Choose the Aggregation Pipeline Stage in the Atlas documentation.
Create a Pipeline Search Stage
You can create the search criteria in your Atlas Search pipeline stage by using Search operators.
The Kotlin Sync driver provides helper methods for the following operators:
Operator | Description |
---|---|
Performs a search for a word or phrase that contains a sequence of characters from an incomplete input string. | |
Combines two or more operators into a single query. | |
Checks whether a field matches a value you specify.
Maps to the | |
Tests if a path to a specified indexed field name exists in a document. | |
Performs a search for an array of BSON number, date, boolean, objectId, uuid, or string values at the given path and returns documents where the value of the field equals any value in the specified array. | |
Returns documents similar to input documents. | |
Supports querying and scoring numeric, date, and GeoJSON point values. | |
Performs a search for documents containing an ordered sequence of terms using the analyzer specified in the index configuration. | |
Supports querying a combination of indexed fields and values. | |
Supports querying and scoring numeric, date, and string values.
Maps to the | |
Interprets the query field as a regular expression. | |
Performs a full-text search using the analyzer that you specify in the index configuration. | |
Enables queries which use special characters in the search string that can match any character. |
Example Pipeline Search Stage
Note
Atlas Sample Dataset
This example uses the sample_mflix.movies
collection from the Atlas sample
datasets. To learn how to set up a free-tier Atlas cluster and load the
sample dataset, see the Get Started with Atlas tutorial
in the Atlas documentation.
Before you can run this example, you must create an Atlas Search index on the movies
collection that has the following definition:
{ "mappings": { "dynamic": true, "fields": { "title": { "analyzer": "lucene.keyword", "type": "string" }, "genres": { "normalizer": "lowercase", "type": "token" } } } }
To learn more about creating Atlas Search indexes, see the Atlas Search and Vector Search Indexes guide.
The following code creates a $search
stage that has the following
specifications:
Checks that the
genres
array includes"Comedy"
Searches the
fullplot
field for the phrase"new york"
Matches
year
values between1950
and2000
, inclusiveSearches for
title
values that begins with the term"Love"
val searchStage = Aggregates.search( SearchOperator.compound() .filter( listOf( SearchOperator.`in`(fieldPath("genres"), listOf("Comedy")), SearchOperator.phrase(fieldPath("fullplot"), "new york"), SearchOperator.numberRange(fieldPath("year")).gtLt(1950, 2000), SearchOperator.wildcard(fieldPath("title"), "Love *") ) ) ) val projectStage = Aggregates.project( Projections.include("title", "year", "genres")) val pipeline = listOf(searchStage, projectStage) val results = collection.aggregate(pipeline) results.forEach { result -> println(result) }
Document{{_id=..., genres=[Comedy, Romance], title=Love at First Bite, year=1979}} Document{{_id=..., genres=[Comedy, Drama], title=Love Affair, year=1994}}
To learn more about the Atlas Search helper methods, see the SearchOperator interface reference in the Driver Core API documentation.
Additional Information
To view a full list of expression operators, see Aggregation Operators in the MongoDB Server manual.
To learn about assembling an aggregation pipeline and view examples, see Aggregation Pipeline in the MongoDB Server manual.
To learn more about creating pipeline stages, see Aggregation Stages in the MongoDB Server manual.
To learn more about explaining MongoDB operations, see Explain Output and Query Plans in the MongoDB Server manual.
API Documentation
For more information about executing aggregation operations with the Kotlin Sync driver, see the following API documentation: