tac team mailing list archive
-
tac team
-
Mailing list archive
-
Message #00038
[Merge] lp:~danielschnurr/tacenergy/oneQueuePerUser into lp:tacenergy
Daniel Schnurr has proposed merging lp:~danielschnurr/tacenergy/oneQueuePerUser into lp:tacenergy.
Requested reviews:
TAC (tac)
QueueOutputService refactored using one Queue instead of multiple Queues.
Using QueueOutputService for all outgoing notifications; deleted TopicManagementService.
--
https://code.launchpad.net/~danielschnurr/tacenergy/oneQueuePerUser/+merge/25069
Your team TAC is requested to review the proposed merge of lp:~danielschnurr/tacenergy/oneQueuePerUser into lp:tacenergy.
=== modified file 'grails-app/domain/edu/kit/iism/cdamarket/Competition.groovy'
--- grails-app/domain/edu/kit/iism/cdamarket/Competition.groovy 2010-05-02 14:36:56 +0000
+++ grails-app/domain/edu/kit/iism/cdamarket/Competition.groovy 2010-05-11 14:03:33 +0000
@@ -41,6 +41,7 @@
Date lastUpdated = new Date()
BigDecimal balancingCostOver = 0
BigDecimal balancingCostUnder = 0
+ String queueNames
static constraints = {
name(unique: true, blank: false)
@@ -65,6 +66,7 @@
lastUpdated(nullable: false)
balancingCostOver(nullable: false, scale: 2)
balancingCostUnder(nullable: false, scale: 2)
+ queueNames(nullable: true)
}
public String toString() {
=== modified file 'grails-app/services/edu/kit/iism/cdamarket/AuctionService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/AuctionService.groovy 2010-05-02 14:36:56 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/AuctionService.groovy 2010-05-11 14:03:33 +0000
@@ -23,7 +23,6 @@
boolean transactional = true
def idGeneratorService
- def topicManagementService
def queueOutputService
public Shout addShout(Shout shout) throws OrderEntryException, MarketOrderException, ExecutionException, SettlementException {
@@ -189,7 +188,7 @@
buySellIndicator: shout.getBuySellIndicator())
if (!execTransactionLog.save()) throw new ExecutionException("Failed to create Trade TransactionLog")
- topicManagementService.sendTradeNotification(execTransactionLog)
+ queueOutputService.sendTradeNotification(execTransactionLog)
executeSettlement(execTransactionLog)
return modShout
@@ -369,7 +368,7 @@
def newOrderbook = new Orderbook(product: shout.getProduct(), transactionID: shout.getTransactionID(), dateExecuted: shout.getDateMod())
newOrderbook.setOrderbookArray(newOrderbookArray)
if (!newOrderbook.save()) throw new OrderbookException("Failed to create Orderbook")
- topicManagementService.sendOrderbookNotification(newOrderbook)
+ queueOutputService.sendOrderbookNotification(newOrderbook)
//Set old orderbook entry outdated
if (!firstOrderbook) {
@@ -401,7 +400,7 @@
|| latestTransactionLog.getAsk() != newTransactionLog.getAsk()
|| latestTransactionLog.getAskSize() != newTransactionLog.getAskSize()) {
if (!newTransactionLog.save()) throw new OrderbookException("Failed to create Quote TransactionLog")
- topicManagementService.sendQuoteNotification(newTransactionLog)
+ queueOutputService.sendQuoteNotification(newTransactionLog)
}
}
}
=== modified file 'grails-app/services/edu/kit/iism/cdamarket/CompetitionControlService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/CompetitionControlService.groovy 2010-05-06 15:33:28 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/CompetitionControlService.groovy 2010-05-11 14:03:33 +0000
@@ -26,7 +26,7 @@
def idGeneratorService
def productManagementService
- def topicManagementService
+ def queueOutputService
def queueManagementService
def forecastGenerationService
def jmsBrokerManagementService
@@ -45,6 +45,7 @@
competition.save(flush: true)
productManagementService.initProducts(competition) //initialize products (Timeslots) for this competition
createParticipants(competition)
+ setQueueNames(competition)
forecastGenerationService.initForecasts(competition)
jmsBrokerManagementService.initTopicsAndQueues()
return competition
@@ -56,7 +57,7 @@
log.debug('Starting competition...')
enableParticipants(competition, true)
log.debug('Participants enabled...')
- topicManagementService.sendCompetitionNotification(competition, ActionType.Started)
+ queueOutputService.sendCompetitionNotification(competition, ActionType.Started)
Thread.currentThread().sleep(2000) //Sleep for one second in order to ensure that the competition notification is received by all participants first
log.debug('Comepetition start announced...')
productManagementService.openInitialProducts(competition)
@@ -76,7 +77,11 @@
public void stop(Competition competition) throws CompetitionManagementException, CompetitionNotFoundException {
if (!competition) throw new CompetitionNotFoundException("Cannot stop competition ${competition}")
try {
+<<<<<<< TREE
if (competition.competitionStatus == CompetitionStatus.Running) topicManagementService.sendCompetitionNotification(competition, ActionType.Stopped)
+=======
+ queueOutputService.sendCompetitionNotification(competition, ActionType.Stopped)
+>>>>>>> MERGE-SOURCE
log.debug('Competition stop announced...')
quartzScheduler.pauseTrigger('mySimpleTrigger', 'mySimpleTriggerGroup')
log.debug('Quartz scheduler stopped...')
@@ -100,7 +105,11 @@
public void reset(Competition competition) throws CompetitionManagementException, CompetitionNotFoundException {
if (!competition) throw new CompetitionNotFoundException("Cannot reset competition ${competition}")
try {
+<<<<<<< TREE
def currentCompetitionStatus = competition.competitionStatus
+=======
+ queueOutputService.sendCompetitionNotification(competition, ActionType.Reset)
+>>>>>>> MERGE-SOURCE
stop(competition)
if (currentCompetitionStatus == CompetitionStatus.Running) topicManagementService.sendCompetitionNotification(competition, ActionType.Reset)
deleteCompetionDomainClasses(competition)
@@ -163,6 +172,22 @@
}
}
+ private void setQueueNames(Competition competition) throws CompetitionNotFoundException {
+ if (!competition) throw new CompetitionNotFoundException("Cannot set queue names for competition ${competition}")
+ def users = Person.findAllByCompetition(competition)
+ String queueNames = ""
+ users.eachWithIndex { person, i ->
+ if (i == 0) {
+ queueNames = queueNames + "users.${person.username}.outputQueue"
+ } else {
+ queueNames = queueNames + ",users.${person.username}.outputQueue"
+ }
+ }
+ competition.queueNames = queueNames
+ competition.save()
+ log.debug "Generated String containing all queueNames: ${competition.queueNames} and saved to domain competition"
+ }
+
private void createParticipants(Competition competition) throws PersonCreationException, CompetitionNotFoundException {
if (!competition) throw new CompetitionNotFoundException("Cannot create participants for ${competition}")
for (i in 1..competition?.participantCount) {
=== modified file 'grails-app/services/edu/kit/iism/cdamarket/ForecastGenerationService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/ForecastGenerationService.groovy 2010-05-06 16:01:26 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/ForecastGenerationService.groovy 2010-05-11 14:03:33 +0000
@@ -77,7 +77,7 @@
Random rand = new Random(person.forecastSeed)
targetProducts.keySet()?.each {product ->
def forecastHorizon = product.serialNumber - currentProduct.serialNumber
- if (forecastHorizon <= competition.timeslotsOpen) { //only generate forecasts that are really needed, i.e. within the forecast horizon
+ if (forecastHorizon < competition.timeslotsOpen) { //only generate forecasts that are really needed, i.e. within the forecast horizon
def forecast = targetProducts[product].trueValue //assume forecast = trueValue
if (forecastHorizon > 0) {//forecastTimeslot and current timeslot different -> calculate forecast
def counter = targetProducts[product].counter
=== modified file 'grails-app/services/edu/kit/iism/cdamarket/JmsBrokerManagementService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/JmsBrokerManagementService.groovy 2010-05-02 14:36:56 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/JmsBrokerManagementService.groovy 2010-05-11 14:03:33 +0000
@@ -84,11 +84,13 @@
def initTopicsAndQueues() throws JMSException {
try {
BrokerViewMBean mbean = getMBean()
+ /*
mbean.addQueue ('public.ClientAnnounce')
mbean.addQueue ('public.ShoutInputQueue')
mbean.addTopic ('public.CompetitionAnnounce')
mbean.addTopic ('public.TimeAnnounce')
mbean.addTopic ('public.Products')
+ */
} catch (Exception e) {
throw new JMSException("Failed to initialize topics and queues: ${e.getMessage()}, ${e.getCause()}")
}
=== modified file 'grails-app/services/edu/kit/iism/cdamarket/ProductManagementService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/ProductManagementService.groovy 2010-05-06 15:51:27 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/ProductManagementService.groovy 2010-05-11 14:03:33 +0000
@@ -25,7 +25,6 @@
boolean transactional = true
- def topicManagementService
def balancingPowerService
def queueOutputService
def idGeneratorService
@@ -90,13 +89,13 @@
if (i >= competition.deactivateTimeslotsAhead && i < (competition.deactivateTimeslotsAhead + competition.timeslotsOpen)) {
product.enabled = true
if (!product.save()) throw new OpenInitialProductsException("Could enable product '${product}': ${product?.errors}")
- topicManagementService.sendProductNotification(product, ActionType.Started)
+ queueOutputService.sendProductNotification(product, ActionType.Started)
log.info("Product opened: ${product}")
}
}
competition.currentCompetitionTime = currentProduct.startDateTime
competition.save()
- topicManagementService.sendCompetitionNotification(competition, ActionType.TimeAdvanced)
+ queueOutputService.sendCompetitionNotification(competition, ActionType.TimeAdvanced)
log.info("New competition time announced: ${currentProduct.startDateTime}")
log.debug('Trying to send out new forecasts...')
sendProductForecasts(competition)
@@ -129,7 +128,7 @@
balancingPowerService.balance(oldestProduct)
} catch (NoActiveProductException nape) {
log.error("No active products found -> Competition end reached.")
- topicManagementService.sendCompetitionNotification(competition, ActionType.Stopped)
+ queueOutputService.sendCompetitionNotification(competition, ActionType.Stopped)
log.debug('Competition stop announced...')
quartzScheduler.pauseTrigger('mySimpleTrigger', 'mySimpleTriggerGroup')
log.debug('Quartz scheduler stopped...')
@@ -161,7 +160,7 @@
log.debug("Product ${newCurrentProduct} now current product")
competition.currentCompetitionTime = newCurrentProduct.startDateTime
competition.save()
- topicManagementService.sendCompetitionNotification(competition, ActionType.TimeAdvanced)
+ queueOutputService.sendCompetitionNotification(competition, ActionType.TimeAdvanced)
log.info("New competition time: ${currentProduct.startDateTime}")
}
}
@@ -205,7 +204,7 @@
if (!product) throw new ProductNotFoundException("Could not deactivate product '${product}'")
product.enabled = enable ?: false
if (!product.save(flush: true)) throw new ProductEnableDisableException("Failed to set product.enabled to ${enable} for ${product}: ${product?.errors}")
- topicManagementService.sendProductNotification(product, enable ? ActionType.Started : ActionType.Stopped)
+ queueOutputService.sendProductNotification(product, enable ? ActionType.Started : ActionType.Stopped)
return product
}
=== modified file 'grails-app/services/edu/kit/iism/cdamarket/QueueOutputService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/QueueOutputService.groovy 2010-05-02 14:36:56 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/QueueOutputService.groovy 2010-05-11 14:03:33 +0000
@@ -18,41 +18,110 @@
package edu.kit.iism.cdamarket
-import edu.kit.iism.cdamarket.ShoutNotificationAction
+import grails.converters.*
class QueueOutputService {
boolean transactional = false
+ /*
+ * Private notifications sent to the user specific queue
+ */
+
def sendOrderstatus(Shout shout) throws ShoutNotFoundException {
- if (!shout) throw new ShoutNotFoundException ("Cannot send shout '${shout}' to jms.")
- sendToQueue("users.${shout.person.username}.Orderstatus", shout.toShoutNotificationXml())
+ if (!shout) throw new ShoutNotFoundException("Cannot send shout '${shout}' to jms.")
+ sendToPersonalQueue("users.${shout.person.username}.outputQueue", shout.toShoutNotificationXml())
return null
}
def sendCash(CashPosition cashPosition) throws CashPositionNotFoundException {
if (!cashPosition) throw new CashPositionNotFoundException("Cannot send cashPosition ${cashPosition} to jms.")
- sendToQueue("users.${cashPosition.person.username}.Cash", cashPosition.toCashPositionNotificationXml())
+ sendToPersonalQueue("users.${cashPosition.person.username}.outputQueue", cashPosition.toCashPositionNotificationXml())
return null
}
def sendDepot(DepotPosition depotPosition) throws DepotPositionNotFoundException {
if (!depotPosition) throw new DepotPositionNotFoundException("Cannot send depotPosition ${depotPosition} to jms.")
- sendToQueue("users.${depotPosition.person.username}.Depot", depotPosition.toDepotPositionNotificationXml())
+ sendToPersonalQueue("users.${depotPosition.person.username}.outputQueue", depotPosition.toDepotPositionNotificationXml())
return null
}
def sendForecast(Forecast forecast) throws ForecastNotFoundException {
if (!forecast) throw new ForecastNotFoundException("Cannot send forecast ${forecast} to jms.")
- sendToQueue("users.${forecast.person.username}.Forecast", forecast.toForecastNotificationXml())
- return null
- }
-
- def sendToQueue(queueName, object) {
- try {
- sendQueueJMSMessage(queueName, object)
- } catch (Exception e) {
- log.error("Failed to send '${object}' to queue ${queueName}. ", e)
+ sendToPersonalQueue("users.${forecast.person.username}.outputQueue", forecast.toForecastNotificationXml())
+ return null
+ }
+
+ def sendToPersonalQueue(queueName, xmlString) {
+ try {
+ sendQueueJMSMessage(queueName, xmlString)
+ } catch (Exception e) {
+ log.error("Failed to send Notification '${xmlString}' to queue ${queueName}.", e)
+ }
+ return null
+ }
+
+ /*
+ * Public information sent to all private queues
+ */
+
+ def sendCompetitionNotification(Competition competition, ActionType actionType) throws CompetitionNotFoundException {
+ if (!competition) throw new CompetitionNotFoundException("Cannot send system notification for competition ${competition} to jms.")
+ def competitionNotification = competition.toCompetitionNotification()
+ if (actionType) competitionNotification.action = actionType
+ String xmlString = competitionNotification as XML
+ sendToQueues(xmlString)
+ log.info "Competition announcement sent: ${xmlString}"
+ return null
+
+ }
+
+ //send a product product notification of current product status ("Product opened" | "Product closed") and properties of respective product object
+
+ def sendProductNotification(Product product, ActionType actionType) throws ProductNotFoundException {
+ if (!product) throw new ProductNotFoundException("Cannot send product notification for product ${product} to jms.")
+ def productNotification = product.toProductNotification()
+ if (actionType) productNotification.action = actionType
+ String xmlString = productNotification as XML
+ sendToQueues(xmlString)
+ log.info "Product announcement sent: ${xmlString}"
+ return null
+ }
+
+ /*
+ * send quote, trade and orderbook notifications with properties of respective objects
+ */
+
+ def sendQuoteNotification(TransactionLog quote) throws QuoteNotFoundException {
+ if (!quote) throw new QuoteNotFoundException("Cannot send quote ${quote} to jms.")
+ String xmlString = quote.toQuoteLogNotificationXml()
+ sendToQueues(xmlString)
+ log.info "Quote announcement sent: ${xmlString}"
+ return null
+ }
+
+ def sendTradeNotification(TransactionLog trade) throws TradeNotFoundException {
+ if (!trade) throw new TradeNotFoundException("Cannot send quote ${trade} to jms.")
+ String xmlString = trade.toTradeLogNotificationXml()
+ sendToQueues(xmlString)
+ log.info "Trade announcement sent: ${xmlString}"
+ return null
+ }
+
+ def sendOrderbookNotification(Orderbook orderbook) throws OrderbookException {
+ if (!orderbook) throw new OrderbookException("Cannot send Orderbook ${orderbook} to jms.")
+ String xmlString = orderbook.toOrderbookNotificationXml()
+ sendToQueues(xmlString)
+ log.info "Ordebook announcement sent: ${xmlString}"
+ return null
+ }
+
+ def sendToQueues(String xmlString) {
+ String queueNames = Competition.findByCurrent(true).queueNames
+ try {
+ sendQueueJMSMessage(queueNames, xmlString)
+ } catch (Exception e) {
+ log.error("Failed to send object '${object}' to topic ${topicName}.", e)
}
return null
}
=== removed file 'grails-app/services/edu/kit/iism/cdamarket/TopicManagementService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/TopicManagementService.groovy 2010-05-02 14:36:56 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/TopicManagementService.groovy 1970-01-01 00:00:00 +0000
@@ -1,85 +0,0 @@
-/*
- * Copyright 2009-2010 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an
- *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- *
- * either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
- */
-
-package edu.kit.iism.cdamarket
-
-import grails.converters.*
-
-class TopicManagementService {
-
- boolean transactional = false
- static pubSub = true
-
- def sendCompetitionNotification(Competition competition, ActionType actionType) throws CompetitionNotFoundException {
- if (!competition) throw new CompetitionNotFoundException("Cannot send system notification for competition ${competition} to jms.")
- def competitionNotification = competition.toCompetitionNotification()
- if (actionType) competitionNotification.action = actionType
- String xmlString = competitionNotification as XML
- sendToTopic("public.CompetitionAnnounce", xmlString)
- log.info "Competition announcement sent: ${xmlString}"
- return null
- }
-
- //send a product product notification of current product status ("Product opened" | "Product closed") and properties of respective product object
- def sendProductNotification(Product product, ActionType actionType) throws ProductNotFoundException {
- if (!product) throw new ProductNotFoundException("Cannot send product notification for product ${product} to jms.")
- def productNotification = product.toProductNotification()
- if (actionType) productNotification.action = actionType
- String xmlString = productNotification as XML
- sendToTopic("public.Products", xmlString)
- log.info "Product announcement sent: ${xmlString}"
- return null
- }
-
- /*
- * send quote, trade and orderbook notifications with properties of respective objects
- */
- def sendQuoteNotification(TransactionLog quote) throws QuoteNotFoundException {
- if (!quote) throw new QuoteNotFoundException ("Cannot send quote ${quote} to jms.")
- String xmlString = quote.toQuoteLogNotificationXml()
- sendToTopic("public.Quote", xmlString)
- log.info "Quote announcement sent: ${xmlString}"
- return null
- }
-
- def sendTradeNotification(TransactionLog trade) throws TradeNotFoundException {
- if (!trade) throw new TradeNotFoundException ("Cannot send quote ${trade} to jms.")
- String xmlString = trade.toTradeLogNotificationXml()
- sendToTopic("public.Trade", xmlString)
- log.info "Trade announcement sent: ${xmlString}"
- return null
- }
-
- def sendOrderbookNotification(Orderbook orderbook) throws OrderbookException {
- if (!orderbook) throw new OrderbookException("Cannot send Orderbook ${orderbook} to jms.")
- String xmlString = orderbook.toOrderbookNotificationXml()
- sendToTopic("public.Orderbook", xmlString)
- log.info "Ordebook announcement sent: ${xmlString}"
- return null
- }
-
- def sendToTopic(topicName, object) {
- try {
- sendPubSubJMSMessage(topicName, object)
- } catch (Exception e) {
- log.error("Failed to send object '${object}' to topic ${topicName}.", e)
- }
- return null
- }
-
-}