← Back to team overview

tac team mailing list archive

[Merge] lp:~danielschnurr/tacenergy/oneQueuePerUser into lp:tacenergy

 

Daniel Schnurr has proposed merging lp:~danielschnurr/tacenergy/oneQueuePerUser into lp:tacenergy.

Requested reviews:
  TAC (tac)


QueueOutputService refactored using one Queue instead of multiple Queues.
Using QueueOutputService for all outgoing notifications; deleted TopicManagementService.
-- 
https://code.launchpad.net/~danielschnurr/tacenergy/oneQueuePerUser/+merge/25069
Your team TAC is requested to review the proposed merge of lp:~danielschnurr/tacenergy/oneQueuePerUser into lp:tacenergy.
=== modified file 'grails-app/domain/edu/kit/iism/cdamarket/Competition.groovy'
--- grails-app/domain/edu/kit/iism/cdamarket/Competition.groovy	2010-05-02 14:36:56 +0000
+++ grails-app/domain/edu/kit/iism/cdamarket/Competition.groovy	2010-05-11 14:03:33 +0000
@@ -41,6 +41,7 @@
   Date lastUpdated = new Date()
   BigDecimal balancingCostOver = 0
   BigDecimal balancingCostUnder = 0
+  String queueNames
 
   static constraints = {
     name(unique: true, blank: false)
@@ -65,6 +66,7 @@
     lastUpdated(nullable: false)
     balancingCostOver(nullable: false, scale: 2)
     balancingCostUnder(nullable: false, scale: 2)
+    queueNames(nullable: true)
   }
 
   public String toString() {

=== modified file 'grails-app/services/edu/kit/iism/cdamarket/AuctionService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/AuctionService.groovy	2010-05-02 14:36:56 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/AuctionService.groovy	2010-05-11 14:03:33 +0000
@@ -23,7 +23,6 @@
   boolean transactional = true
 
   def idGeneratorService
-  def topicManagementService
   def queueOutputService
 
   public Shout addShout(Shout shout) throws OrderEntryException, MarketOrderException, ExecutionException, SettlementException {
@@ -189,7 +188,7 @@
         buySellIndicator: shout.getBuySellIndicator())
 
     if (!execTransactionLog.save()) throw new ExecutionException("Failed to create Trade TransactionLog")
-    topicManagementService.sendTradeNotification(execTransactionLog)
+    queueOutputService.sendTradeNotification(execTransactionLog)
     executeSettlement(execTransactionLog)
 
     return modShout
@@ -369,7 +368,7 @@
       def newOrderbook = new Orderbook(product: shout.getProduct(), transactionID: shout.getTransactionID(), dateExecuted: shout.getDateMod())
       newOrderbook.setOrderbookArray(newOrderbookArray)
       if (!newOrderbook.save()) throw new OrderbookException("Failed to create Orderbook")
-      topicManagementService.sendOrderbookNotification(newOrderbook)
+      queueOutputService.sendOrderbookNotification(newOrderbook)
 
       //Set old orderbook entry outdated
       if (!firstOrderbook) {
@@ -401,7 +400,7 @@
           || latestTransactionLog.getAsk() != newTransactionLog.getAsk()
           || latestTransactionLog.getAskSize() != newTransactionLog.getAskSize()) {
         if (!newTransactionLog.save()) throw new OrderbookException("Failed to create Quote TransactionLog")
-        topicManagementService.sendQuoteNotification(newTransactionLog)
+        queueOutputService.sendQuoteNotification(newTransactionLog)
       }
     }
   }

=== modified file 'grails-app/services/edu/kit/iism/cdamarket/CompetitionControlService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/CompetitionControlService.groovy	2010-05-06 15:33:28 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/CompetitionControlService.groovy	2010-05-11 14:03:33 +0000
@@ -26,7 +26,7 @@
 
   def idGeneratorService
   def productManagementService
-  def topicManagementService
+  def queueOutputService
   def queueManagementService
   def forecastGenerationService
   def jmsBrokerManagementService
@@ -45,6 +45,7 @@
     competition.save(flush: true)
     productManagementService.initProducts(competition) //initialize products (Timeslots) for this competition
     createParticipants(competition)
+    setQueueNames(competition)
     forecastGenerationService.initForecasts(competition)
     jmsBrokerManagementService.initTopicsAndQueues()
     return competition
@@ -56,7 +57,7 @@
       log.debug('Starting competition...')
       enableParticipants(competition, true)
       log.debug('Participants enabled...')
-      topicManagementService.sendCompetitionNotification(competition, ActionType.Started)
+      queueOutputService.sendCompetitionNotification(competition, ActionType.Started)
       Thread.currentThread().sleep(2000) //Sleep for one second in order to ensure that the competition notification is received by all participants first
       log.debug('Comepetition start announced...')
       productManagementService.openInitialProducts(competition)
@@ -76,7 +77,11 @@
   public void stop(Competition competition) throws CompetitionManagementException, CompetitionNotFoundException {
     if (!competition) throw new CompetitionNotFoundException("Cannot stop competition ${competition}")
     try {
+<<<<<<< TREE
       if (competition.competitionStatus == CompetitionStatus.Running) topicManagementService.sendCompetitionNotification(competition, ActionType.Stopped)
+=======
+      queueOutputService.sendCompetitionNotification(competition, ActionType.Stopped)
+>>>>>>> MERGE-SOURCE
       log.debug('Competition stop announced...')
       quartzScheduler.pauseTrigger('mySimpleTrigger', 'mySimpleTriggerGroup')
       log.debug('Quartz scheduler stopped...')
@@ -100,7 +105,11 @@
   public void reset(Competition competition) throws CompetitionManagementException, CompetitionNotFoundException {
     if (!competition) throw new CompetitionNotFoundException("Cannot reset competition ${competition}")
     try {
+<<<<<<< TREE
       def currentCompetitionStatus = competition.competitionStatus
+=======
+      queueOutputService.sendCompetitionNotification(competition, ActionType.Reset)
+>>>>>>> MERGE-SOURCE
       stop(competition)
       if (currentCompetitionStatus == CompetitionStatus.Running) topicManagementService.sendCompetitionNotification(competition, ActionType.Reset)
       deleteCompetionDomainClasses(competition)
@@ -163,6 +172,22 @@
     }
   }
 
+  private void setQueueNames(Competition competition) throws CompetitionNotFoundException {
+    if (!competition) throw new CompetitionNotFoundException("Cannot set queue names for competition ${competition}")
+    def users = Person.findAllByCompetition(competition)
+    String queueNames = ""
+    users.eachWithIndex { person, i ->
+      if (i == 0) {
+        queueNames = queueNames + "users.${person.username}.outputQueue"
+      } else {
+        queueNames = queueNames + ",users.${person.username}.outputQueue"
+      }
+    }
+    competition.queueNames = queueNames
+    competition.save()
+    log.debug "Generated String containing all queueNames: ${competition.queueNames} and saved to domain competition"
+  }
+
   private void createParticipants(Competition competition) throws PersonCreationException, CompetitionNotFoundException {
     if (!competition) throw new CompetitionNotFoundException("Cannot create participants for ${competition}")
     for (i in 1..competition?.participantCount) {

=== modified file 'grails-app/services/edu/kit/iism/cdamarket/ForecastGenerationService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/ForecastGenerationService.groovy	2010-05-06 16:01:26 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/ForecastGenerationService.groovy	2010-05-11 14:03:33 +0000
@@ -77,7 +77,7 @@
     Random rand = new Random(person.forecastSeed)
     targetProducts.keySet()?.each {product ->
       def forecastHorizon = product.serialNumber - currentProduct.serialNumber
-      if (forecastHorizon <= competition.timeslotsOpen) { //only generate forecasts that are really needed, i.e. within the forecast horizon
+      if (forecastHorizon < competition.timeslotsOpen) { //only generate forecasts that are really needed, i.e. within the forecast horizon
         def forecast = targetProducts[product].trueValue //assume forecast = trueValue
         if (forecastHorizon > 0) {//forecastTimeslot and current timeslot different -> calculate forecast
           def counter = targetProducts[product].counter

=== modified file 'grails-app/services/edu/kit/iism/cdamarket/JmsBrokerManagementService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/JmsBrokerManagementService.groovy	2010-05-02 14:36:56 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/JmsBrokerManagementService.groovy	2010-05-11 14:03:33 +0000
@@ -84,11 +84,13 @@
   def initTopicsAndQueues() throws JMSException {
     try {
       BrokerViewMBean mbean = getMBean()
+      /*
       mbean.addQueue ('public.ClientAnnounce')
       mbean.addQueue ('public.ShoutInputQueue')
       mbean.addTopic ('public.CompetitionAnnounce')
       mbean.addTopic ('public.TimeAnnounce')
       mbean.addTopic ('public.Products')
+      */
     } catch (Exception e) {
       throw new JMSException("Failed to initialize topics and queues: ${e.getMessage()}, ${e.getCause()}")
     }

=== modified file 'grails-app/services/edu/kit/iism/cdamarket/ProductManagementService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/ProductManagementService.groovy	2010-05-06 15:51:27 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/ProductManagementService.groovy	2010-05-11 14:03:33 +0000
@@ -25,7 +25,6 @@
 
   boolean transactional = true
 
-  def topicManagementService
   def balancingPowerService
   def queueOutputService
   def idGeneratorService
@@ -90,13 +89,13 @@
       if (i >= competition.deactivateTimeslotsAhead && i < (competition.deactivateTimeslotsAhead + competition.timeslotsOpen)) {
         product.enabled = true
         if (!product.save()) throw new OpenInitialProductsException("Could enable product '${product}': ${product?.errors}")
-        topicManagementService.sendProductNotification(product, ActionType.Started)
+        queueOutputService.sendProductNotification(product, ActionType.Started)
         log.info("Product opened: ${product}")
       }
     }
     competition.currentCompetitionTime = currentProduct.startDateTime
     competition.save()
-    topicManagementService.sendCompetitionNotification(competition, ActionType.TimeAdvanced)
+    queueOutputService.sendCompetitionNotification(competition, ActionType.TimeAdvanced)
     log.info("New competition time announced: ${currentProduct.startDateTime}")
     log.debug('Trying to send out new forecasts...')
     sendProductForecasts(competition)
@@ -129,7 +128,7 @@
       balancingPowerService.balance(oldestProduct)
     } catch (NoActiveProductException nape) {
       log.error("No active products found -> Competition end reached.")
-      topicManagementService.sendCompetitionNotification(competition, ActionType.Stopped)
+      queueOutputService.sendCompetitionNotification(competition, ActionType.Stopped)
       log.debug('Competition stop announced...')
       quartzScheduler.pauseTrigger('mySimpleTrigger', 'mySimpleTriggerGroup')
       log.debug('Quartz scheduler stopped...')
@@ -161,7 +160,7 @@
       log.debug("Product ${newCurrentProduct} now current product")
       competition.currentCompetitionTime = newCurrentProduct.startDateTime
       competition.save()
-      topicManagementService.sendCompetitionNotification(competition, ActionType.TimeAdvanced)
+      queueOutputService.sendCompetitionNotification(competition, ActionType.TimeAdvanced)
       log.info("New competition time: ${currentProduct.startDateTime}")
     }
   }
@@ -205,7 +204,7 @@
     if (!product) throw new ProductNotFoundException("Could not deactivate product '${product}'")
     product.enabled = enable ?: false
     if (!product.save(flush: true)) throw new ProductEnableDisableException("Failed to set product.enabled to ${enable} for ${product}: ${product?.errors}")
-    topicManagementService.sendProductNotification(product, enable ? ActionType.Started : ActionType.Stopped)
+    queueOutputService.sendProductNotification(product, enable ? ActionType.Started : ActionType.Stopped)
     return product
   }
 

=== modified file 'grails-app/services/edu/kit/iism/cdamarket/QueueOutputService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/QueueOutputService.groovy	2010-05-02 14:36:56 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/QueueOutputService.groovy	2010-05-11 14:03:33 +0000
@@ -18,41 +18,110 @@
 
 package edu.kit.iism.cdamarket
 
-import edu.kit.iism.cdamarket.ShoutNotificationAction
+import grails.converters.*
 
 class QueueOutputService {
 
   boolean transactional = false
 
+  /*
+  * Private notifications sent to the user specific queue
+  */
+
   def sendOrderstatus(Shout shout) throws ShoutNotFoundException {
-    if (!shout) throw new ShoutNotFoundException ("Cannot send shout '${shout}' to jms.")
-    sendToQueue("users.${shout.person.username}.Orderstatus", shout.toShoutNotificationXml())
+    if (!shout) throw new ShoutNotFoundException("Cannot send shout '${shout}' to jms.")
+    sendToPersonalQueue("users.${shout.person.username}.outputQueue", shout.toShoutNotificationXml())
     return null
   }
 
   def sendCash(CashPosition cashPosition) throws CashPositionNotFoundException {
     if (!cashPosition) throw new CashPositionNotFoundException("Cannot send cashPosition ${cashPosition} to jms.")
-    sendToQueue("users.${cashPosition.person.username}.Cash", cashPosition.toCashPositionNotificationXml())
+    sendToPersonalQueue("users.${cashPosition.person.username}.outputQueue", cashPosition.toCashPositionNotificationXml())
     return null
   }
 
   def sendDepot(DepotPosition depotPosition) throws DepotPositionNotFoundException {
     if (!depotPosition) throw new DepotPositionNotFoundException("Cannot send depotPosition ${depotPosition} to jms.")
-    sendToQueue("users.${depotPosition.person.username}.Depot", depotPosition.toDepotPositionNotificationXml())
+    sendToPersonalQueue("users.${depotPosition.person.username}.outputQueue", depotPosition.toDepotPositionNotificationXml())
     return null
   }
 
   def sendForecast(Forecast forecast) throws ForecastNotFoundException {
     if (!forecast) throw new ForecastNotFoundException("Cannot send forecast ${forecast} to jms.")
-    sendToQueue("users.${forecast.person.username}.Forecast", forecast.toForecastNotificationXml())
-    return null
-  }
-
-  def sendToQueue(queueName, object) {
-    try {
-      sendQueueJMSMessage(queueName, object)
-    } catch (Exception e) {
-      log.error("Failed to send '${object}' to queue ${queueName}. ", e)
+    sendToPersonalQueue("users.${forecast.person.username}.outputQueue", forecast.toForecastNotificationXml())
+    return null
+  }
+
+  def sendToPersonalQueue(queueName, xmlString) {
+    try {
+      sendQueueJMSMessage(queueName, xmlString)
+    } catch (Exception e) {
+      log.error("Failed to send Notification '${xmlString}' to queue ${queueName}.", e)
+    }
+    return null
+  }
+
+  /*
+  * Public information sent to all private queues
+  */
+
+  def sendCompetitionNotification(Competition competition, ActionType actionType) throws CompetitionNotFoundException {
+    if (!competition) throw new CompetitionNotFoundException("Cannot send system notification for competition ${competition} to jms.")
+    def competitionNotification = competition.toCompetitionNotification()
+    if (actionType) competitionNotification.action = actionType
+    String xmlString = competitionNotification as XML
+    sendToQueues(xmlString)
+    log.info "Competition announcement sent: ${xmlString}"
+    return null
+
+  }
+
+  //send a product product notification of current product status ("Product opened" | "Product closed") and properties of respective product object
+
+  def sendProductNotification(Product product, ActionType actionType) throws ProductNotFoundException {
+    if (!product) throw new ProductNotFoundException("Cannot send product notification for product ${product} to jms.")
+    def productNotification = product.toProductNotification()
+    if (actionType) productNotification.action = actionType
+    String xmlString = productNotification as XML
+    sendToQueues(xmlString)
+    log.info "Product announcement sent: ${xmlString}"
+    return null
+  }
+
+  /*
+  * send quote, trade and orderbook notifications with properties of respective objects
+  */
+
+  def sendQuoteNotification(TransactionLog quote) throws QuoteNotFoundException {
+    if (!quote) throw new QuoteNotFoundException("Cannot send quote ${quote} to jms.")
+    String xmlString = quote.toQuoteLogNotificationXml()
+    sendToQueues(xmlString)
+    log.info "Quote announcement sent: ${xmlString}"
+    return null
+  }
+
+  def sendTradeNotification(TransactionLog trade) throws TradeNotFoundException {
+    if (!trade) throw new TradeNotFoundException("Cannot send quote ${trade} to jms.")
+    String xmlString = trade.toTradeLogNotificationXml()
+    sendToQueues(xmlString)
+    log.info "Trade announcement sent: ${xmlString}"
+    return null
+  }
+
+  def sendOrderbookNotification(Orderbook orderbook) throws OrderbookException {
+    if (!orderbook) throw new OrderbookException("Cannot send Orderbook ${orderbook} to jms.")
+    String xmlString = orderbook.toOrderbookNotificationXml()
+    sendToQueues(xmlString)
+    log.info "Ordebook announcement sent: ${xmlString}"
+    return null
+  }
+
+  def sendToQueues(String xmlString) {
+    String queueNames = Competition.findByCurrent(true).queueNames
+    try {
+      sendQueueJMSMessage(queueNames, xmlString)
+    } catch (Exception e) {
+      log.error("Failed to send object '${object}' to topic ${topicName}.", e)
     }
     return null
   }

=== removed file 'grails-app/services/edu/kit/iism/cdamarket/TopicManagementService.groovy'
--- grails-app/services/edu/kit/iism/cdamarket/TopicManagementService.groovy	2010-05-02 14:36:56 +0000
+++ grails-app/services/edu/kit/iism/cdamarket/TopicManagementService.groovy	1970-01-01 00:00:00 +0000
@@ -1,85 +0,0 @@
-/*
- * Copyright 2009-2010 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an
- *
- * "AS IS" BASIS,  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- *
- * either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
- */
-
-package edu.kit.iism.cdamarket
-
-import grails.converters.*
-
-class TopicManagementService {
-
-  boolean transactional = false
-  static pubSub = true
-
-  def sendCompetitionNotification(Competition competition, ActionType actionType) throws CompetitionNotFoundException {
-    if (!competition) throw new CompetitionNotFoundException("Cannot send system notification for competition ${competition} to jms.")
-    def competitionNotification = competition.toCompetitionNotification()
-    if (actionType) competitionNotification.action = actionType
-    String xmlString = competitionNotification as XML
-    sendToTopic("public.CompetitionAnnounce", xmlString)
-    log.info "Competition announcement sent: ${xmlString}"
-    return null
-  }
-
-  //send a product product notification of current product status ("Product opened" | "Product closed") and properties of respective product object
-  def sendProductNotification(Product product, ActionType actionType) throws ProductNotFoundException {
-    if (!product) throw new ProductNotFoundException("Cannot send product notification for product ${product} to jms.")
-    def productNotification = product.toProductNotification()
-    if (actionType) productNotification.action = actionType
-    String xmlString = productNotification as XML
-    sendToTopic("public.Products", xmlString)
-    log.info "Product announcement sent: ${xmlString}"
-    return null
-  }
-
-  /*
-  * send quote, trade and orderbook notifications with properties of respective objects
-  */
-  def sendQuoteNotification(TransactionLog quote) throws QuoteNotFoundException {
-    if (!quote) throw new QuoteNotFoundException ("Cannot send quote ${quote} to jms.")
-    String xmlString = quote.toQuoteLogNotificationXml()
-    sendToTopic("public.Quote", xmlString)
-    log.info "Quote announcement sent: ${xmlString}"
-    return null
-  }
-
-  def sendTradeNotification(TransactionLog trade) throws TradeNotFoundException {
-    if (!trade) throw new TradeNotFoundException ("Cannot send quote ${trade} to jms.")
-    String xmlString = trade.toTradeLogNotificationXml()
-    sendToTopic("public.Trade", xmlString)
-    log.info "Trade announcement sent: ${xmlString}"
-    return null
-  }
-
-  def sendOrderbookNotification(Orderbook orderbook) throws OrderbookException {
-    if (!orderbook) throw new OrderbookException("Cannot send Orderbook ${orderbook} to jms.")
-    String xmlString = orderbook.toOrderbookNotificationXml()
-    sendToTopic("public.Orderbook", xmlString)
-    log.info "Ordebook announcement sent: ${xmlString}"
-    return null
-  }
-
-  def sendToTopic(topicName, object) {
-    try {
-      sendPubSubJMSMessage(topicName, object)
-    } catch (Exception e) {
-      log.error("Failed to send object '${object}' to topic ${topicName}.", e)
-    }
-    return null
-  }
-
-}