Using components: MongoDB Source

Use the MongoDB source component to read data stored in an MongoDB collection.

Connection

Select an existing MongoDB connection or create a new one (for more information, see Allowing Integrate.io ETL access to MongoDB.)

Source Properties

  • Source collection - the collection name from which the data will be imported.
  • Filter query - use MongoDB extended JSON to apply a filter on MongoDB's server side, or leave empty to query the entire collection. Note that $ is a special character that denotes a variable, so it must be escaped by a single back-slash in your extended JSON filter. For example:

    {"age":{"\$gt":24}}  - extract all documents where age is greater than 24.

    {"\$or":[{"price":{"\$exists":false}},{"price":{"\$eq":0}}]} - extract all documents where price is zero or does not exist.

    {"timestamp":{"\$gt":{"\$date":"2014-01-01T00:00:00.000Z"}}} - extract all documents where timestamp is greater than the date value 2014-01-01T00:00:00.000Z

Source Schema

After defining the source collection, select the fields to use in the source.

The fields you select are the only ones pulled from the source collection.

Define the data type for the field. Use the following table when matching MongoDB data types to Integrate.io ETL data types.

MongoDB Integrate.io ETL
String String
32 Bit Integer Integer
64 Bit Integer Long
Double Double
Date DateTime
Object Json
Array Json Array
Boolean Boolean
ObjectID String

Reading data incrementally from MongoDB

In order to read data incrementally (changes and additions) from a collection, we need a timestamp column that specifies when the data was updated (or inserted in collections where data is only inserted). In our example, this column is called "updated_at". When reading from the source collection, we’ll use a filter query to only read the rows that were updated since the last time the package executed. We can use the following filter query, in which $last_updated_at is a package variable (make sure to use a single back-slash to escape the $ as is mentioned above):
 
{"updated_at": {"\$gt":{"\$date": $last_updated_at}}}

thumbnail imageNote that the schema detection or data preview fails when using the variable in the filter query, as variables are not evaluated in design time.

You can use the predefined variable _PACKAGE_LAST_SUCCESSFUL_JOB_SUBMISSION_TIMESTAMP which returns the submission timestamp for the last successful execution of the package as you can see in the example below as a value for the variable, or use the ExecuteSqlDatetime function to execute a query on the target database to get the max (last) value of updated_at in the target. Wrap the variable with a CASE statement to handle empty values and to allow full load if required. Then convert the datetime timestamp to a Unix timestamp and multiply it times 1000 to result in a Unix timestamp in milliseconds like MongoDB uses.

full_load = 0
last_updated_at = CASE WHEN (COALESCE($_PACKAGE_LAST_SUCCESSFUL_JOB_SUBMISSION_TIMESTAMP,'')=='' OR $full_load==1) THEN 0 ELSE ToUnixTime(ToDate($_PACKAGE_LAST_SUCCESSFUL_JOB_SUBMISSION_TIMESTAMP)) * 1000 END

thumbnail image

In order to store additions or changes in your database destination, make sure to mark the id column as key and change the operation type to "merge."