PostgreSQL – Database exception in Slick 3.0 when insertion insertion

By batch inserting thousands of records every 5 seconds in Smooth 3, I got

org.postgresql .util.PSQLException: FATAL: sorry, too many clients already

My data access layer looks like:

val db: CustomPostgresDriver.backend. DatabaseDef = Database.forURL(url, user=user, password=password, driver= jdbcDriver)



override def insertBatch(rowList: List[T#TableElementType]): Future[Long] = {
val res = db.run(insertBatchQuery(rowList)).map(_.head.toLong).recover{ case ex:Throwable=> RelationalRepositoryUtility.handleBatchOperationErrors(ex)}
//db.close()
res
}

override def insertBatchQuery(rowList: List[T#TableElementType]): FixedSqlAction[Option[Int], NoStream, Write ] = {
query ++= (rowList)
}

Closing the connection in the insert batch has no effect…it still gives the same error.

< p>I call the insert batch from my code as follows:

val temp1 = list1.flatMap {li =>
Future.sequence(li .map {trip =>
val data = for {
tripData <- TripDataRepository.insertQuery( trip.tripData)
subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
} yield ((tripData, subTripData))
val res=db.run(data.transactionally)
res
//db.close()
})
}

If I am in my Close the connection after work, you can see in the comment code that I get the error:

java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon $2@6c3ae2b6 rejected from java.util.concurrent.ThreadPoolExecutor@79d2d4eb[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]

There is no Future.sequence in the call After the method, like this:

val temp1 =list.map {trip =>
val data = for {
tripData <- TripDataRepository.insertQuery (trip.tripData)
subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.s ubTripData, tripData.id))
} yield ((tripData, subTripData))
val res=db.run(data.transactionally)
res
}

< p>I still have too many customer errors…

The root of this problem is that you are simultaneously Start an unlimited list of Futures, each of which is connected to the database-one for each entry in the list.

This can be solved by running inserts serially, forcing each insert batch to depend on Previous:

// Empty Future for the results. Replace Unit with the correct type-whatever
// "res" is below.
val emptyFuture = Future.successful(Seq.empty[Unit])
// This will only insert one at a time. You could use list.sliding to batch the
// inserts if that was important.< br />val temp1 = list.foldLeft(emptyFuture) {(previousFuture, trip) =>
previousFuture flatMap {previous =>
// Inner code copied from your example.
val data = for {
tripData <- TripDataRepository.insertQuery(trip.tripData)
subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripData List(trip.subTripData, tripData.id))
} yield ((tripData, subTripData))
val res = db.run(data.transactionally)
previous :+ res
}
}

By batch inserting thousands of records every 5 seconds in Smooth 3, I got

org.postgresql.util.PSQLException: FATAL: sorry, too many clients already

My data access layer looks like:

val db: CustomPostgresDriver.backend.DatabaseDef = Database.forURL(url, user=user, password=password, driver= jdbcDriver)



override def insertBatch (rowList: List[T#TableElementType]): Future[Long] = {
val res = db.run(insertBatchQuery(rowList)).map(_.head.toLong).recover{ case ex:Throwable= > RelationalRepositoryUtility.handleBatchOperationErrors(ex)}
//db.close()
res
}

override def insertBatchQuery(rowList: List[T#TableElementType]) : FixedSqlAction[Option[Int], NoStream, Write] = {
query ++= (rowList)
}

Closing the connection in the insert batch has no effect… it still Gives the same error.

I got from my code The insert batch is called in the following:

val temp1 = list1.flatMap {li =>
Future.sequence(li.map {trip =>< br /> val data = for {
tripData <- TripDataRepository.insertQuery( trip.tripData)
subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
} yield ((tripData, subTripData))
val res=db.run(data.transactionally)
res
//db.close()
})
}

If I close the connection after my work, you can see in the comment code that I get the error:

java.util.concurrent.RejectedExecutionException : Task slick.backend.DatabaseComponent$DatabaseDef$$anon$2@6c3ae2b6 rejected from java.util.concurrent.ThreadPoolExecutor@79d2d4eb[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]< /pre> 

After calling the method without Future.sequence, like this:

val temp1 =list.map {trip =>
val data = for {
tripData <- TripD ataRepository.insertQuery( trip.tripData)
subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
} yield ((tripData, subTripData))
val res= db.run(data.transactionally)
res
}

I still have too many customer errors...

< p>The root of this problem is that you are starting an infinite list of Futures at the same time, each of which is connected to the database-one for each entry in the list.

This can be inserted by running serially To solve, force each insert batch to depend on the previous one:

// Empty Future for the results. Replace Unit with the correct type-whatever
// "res" is below.
val emptyFuture = Future.successful(Seq.empty[Unit])
// This will only insert one at a time. You could use list.sliding to batch the
// inserts if that was important.
val temp1 = list.foldLeft(emptyFuture) {(previousFuture, trip) =>
previousFuture flatMap {previous =>
// Inner code copied from your example.
val data = for {
tripData <- TripDataRepository.insertQuery(trip.tripDa ta)
subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData, tripData.id))
} yield ((tripData, subTripData))
val res = db.run(data. transactionally)
previous :+ res
}
}

Leave a Comment

Your email address will not be published.