Content

Installation

on pslogin
ana-1.3.37
scs
cd ...
virtualenv venv-pymongo
source venv-pymongo/bin/activate

???
# python -m pip install pymongo 


Alternative installation:
-------------------------
# https://docs.mongodb.com/manual/tutorial/install-mongodb-on-linux/
cd lib

curl -O https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-3.6.2.tgz

tar -zxvf mongodb-linux-x86_64-3.6.2.tgz

mkdir -p mongodb
cp -R -n mongodb-linux-x86_64-3.6.2/ mongodb

export PATH=/reg/neh/home/dubrovin/LCLS/venv-pymongo/lib/mongodb/mongodb-linux-x86_64-3.6.2/bin/:$PATH
echo $PATH
The same in 
source set_path_to_mongodb

1. Create the data directory
mkdir -p ./data/db

2. Set r/w permissions for the data directory
chmod 775 data
chmod 775 data/db 

Run server

pslogin
ssh psanaphi105
cd LCLS/venv-pymongo/
source bin/activate
source set_path_to_mongodb
assumes that ./data/db is already created 
mongod --dbpath ./data/db --bind_ip_all &

!!! DO NOT CLOSE WINDOW, 

Shell

Shell is a manual command line interface.

mongo --host psanaphi105 --port 27017

To exit the shell, type quit() or use the <Ctrl-C> shortcut.

> db
test

> show dbs
admin           0.000GB
calib-cxif5315  0.006GB
config          0.000GB
local           0.000GB

> use calib-cxif5315
switched to db calib-cxif5315

> show collections
cspad-0-cxids1-0
cspad-1

> db["cspad-0-cxids1-0"].find()
> db["cspad-0-cxids1-0"].find().pretty()

# Delete databale:
use calib-cxif5315 
db.dropDatabase()

# Delete collection
db.collection.drop()
# OR:
db["cspad-0-cxids1-0"].drop()

> help

# Export/backup database in file
> mongodump -d <dbname> --archive <filename> --out /path/to/backup/dir

# Import database from file
mongorestore -d <dbname> --archive <filename>

Connection to DB in python

from pymongo import MongoClient
#client = MongoClient('localhost', 27017)
client = MongoClient('psanaphi105', 27017) #, username=uname, password=pwd)
db = client['calib-cxi12345']
col = db['camera-0-cxids1-0']

Connection time is 50-150ms depending on host and time.

Python API

 address                  get_database             max_idle_time_ms         read_concern              
 arbiters                 get_default_database     max_message_size         read_preference           
 close                    HOST                     max_pool_size            secondaries               
 close_cursor             is_locked                max_write_batch_size     server_info               
 codec_options            is_mongos                min_pool_size            server_selection_timeout 
 database_names           is_primary               next                     set_cursor_manager       
 drop_database            kill_cursors             nodes                    unlock                   
 event_listeners          local_threshold_ms       PORT                     write_concern            
 fsync                    max_bson_size            primary                                           
             add_son_manipulator           error                         previous_error                
             add_user                      eval                          profiling_info                
             authenticate                  get_collection                profiling_level               
             client                        incoming_copying_manipulators read_concern                  
             codec_options                 incoming_manipulators         read_preference               
             collection_names              last_status                   remove_user                   
             command                       logout                        reset_error_history           
             create_collection             name                          set_profiling_level           
             current_op                    next                          system_js                     
             dereference                   outgoing_copying_manipulators validate_collection           
             drop_collection               outgoing_manipulators         write_concern
               aggregate                    find_one                     next                         
               bulk_write                   find_one_and_delete          options                      
               codec_options                find_one_and_replace         parallel_scan                
               count                        find_one_and_update          read_concern                 
               create_index                 full_name                    read_preference              
               create_indexes               group                        reindex                      
               database                     index_information            remove                       
               delete_many                  initialize_ordered_bulk_op   rename                       
               delete_one                   initialize_unordered_bulk_op replace_one                  
               distinct                     inline_map_reduce            save                         
               drop                         insert                       update                       
               drop_index                   insert_many                  update_many                  
               drop_indexes                 insert_one                   update_one                   
               ensure_index                 list_indexes                 with_options                 
               find                         map_reduce                   write_concern                
               find_and_modify              name                                                      
              add_option()        count()             max_time_ms()        
              address             cursor_id           min()                
              alive               distinct()          next()               
              batch_size()        explain()           remove_option()      
              clone()             hint()              retrieved           
              close()             limit()             rewind()            
              collation()         max()               skip()              
              collection          max_await_time_ms() sort()              
              comment()           max_scan()          where()             
             clear()      get()        pop()        update()               
             copy()       items()      popitem()    values()               
             fromkeys()   keys()       setdefault()                        

 

Tentative model of the calibration store

Experiment-centric calibration data base

# Database for experiment
dbexp = client("db-cxif5315")
fs = big_data_file_system(dbexp)
 
# Collections:
"cspad-0-cxids2-0"
"cspad2x2-0-cxids2-0"
"andor-0-cxids2-0"
...
# Auto-generated collections
'fs.files'
'fs.chunks'

# Document content for dbexp
doc = {
   "_id":ObjectId("53402597d852426020000002"), # auto-generated
   "experiment": "cxif5315",
   "run": "123",
   "detector": "cspad-0-cxids2-0",
   "ctype": "pedestals",
   "time_sec": "1516321053",
   "time_stamp": "2018-01-18T16:17:33-0800",
   "version": "v00",
   "uid": "login-name",
   "host": "psanaphi102",
   "port": "12345",
   "comment": "very good constants",
   "id_data": "5a98464a5777035bba3a4f41"  # added as a reference to big data
 }

All meta-data information is accessible through a single-level document.

Detector-centric calibration data base

# Model #1: DB per detector type, collection per detector:
-----------------------------------------------------------
dbdet = client('db-cspad')

# Collections:
'cspad-0-cxids1-0'
'cspad-0-cxids2-0'
'cspad-0-cxidsd-0'
'cspad-0-xcsendstation-0'
'cspad-0-xppgon-0'
'cspad-0-sxrbeamline-1'
'cspad-0-mectargetchamber-0'

# Document content for dbdet the same as dbexp plus "id_data"
doc = {...
   "id_data": ObjectId("534009e4d852427820000002"),
   ...
}

# Model #2: DB per detector, one collection per detector:
---------------------------------------------------------
dbdet = client('db-cspad-0-cxids1-0')
col = dbdet['cspad-0-cxids1-0']

# Add collections in case of DB copy
'fs.files'
'fs.chunks'


Essentially document in the detector collection has a reference to the data in the experiment collections.

 

Data flow for documents less than 16 MB

Preparation of data

nda = gu.random_standard(shape=(32,185,388), mu=20, sigma=5, dtype=gu.np.float)

import pickle
from bson.binary import Binary

t0_sec = time()

arr = nda.flatten()
arr = ' '.join(['%.2f' % v for v in arr])
sarr = Binary(pickle.dumps(arr, protocol=2), subtype=128)

doc = {
   "experiment": "cxi12345",
   "run": 124,
   ...
   "data": sarr,
}

dt_sec = time() - t0_sec

Inserting data

doc_id = col.insert_one(doc).inserted_id

Insertion time is 110-180ms.

Find data

t0_sec = time()
docs = col.find({"run": 125})
dt_sec = time() - t0_sec

Finding data time is 50-60us

Unpack data

doc = docs[0]
xcarr = pickle.loads(doc["data"]) # 30-40ms
arr = gu.np.fromstring(xcarr, dtype=float, count=-1, sep=' ') # 300ms

Time to unpack is 350ms.

Data flow for large documents

Timing test is done for mongod running on psanaphi105 and scripts on psanagpu106.

Initialization

import gridfs
from pymongo import MongoClient
#client = MongoClient('localhost')
client = MongoClient('psanaphi105', 27017)
db = client['calib-cxi12345']
fs = gridfs.GridFS(db)
col = db['camera-0-cxids1-0']

Time to connect 116-150ms.

Put

ida = fs.put(nda.tobytes())

Time to save data 330-420ms.

doc = {
   "experiment": "cxi12345",
   "run": 126,
   "detector": col.name,
   "ctype": "pedestals",
   "data_size":  nda.size,
   "data_shape": nda.shape,
   "data_type":  str(nda.dtype),
   "data_id":    ida,
	...
}
doc_id = col.insert_one(doc).inserted_id

Document meta-data with reference to data preparation time is 43-53us.

Insert metadata time 0.5-0.6ms.

Get

docs = col.find({"time_stamp" : "2018-01-25T09:33:10PST"})
doc = docs[0]

Metadata find and get time: 0.7ms

s = fs.get(doc['data_id']).read()
nda = gu.np.fromstring(s)

Data extraction time: 96ms. Thus returned array is "flattend" and needs to be shaped.

Interface from Murali

2018-08-03 e-mail from Murali:
I have installed Mongo 4.0 on psdb-dev. I was hoping to use their REST service but this seems to have been deprecated and eliminated since 3.6. 
So, I knocked a quick web service and have proxied it from pswww. This web service (https://github.com/slaclab/psdm_mongo_ws) is a suggestion only; please let me know if you need something different. 
These are examples of getting data over HTTPS from a batch node from within cori; needless to say, the URL prefix is https://pswww.slac.stanford.edu/calib_ws

Two users:

Test commands:

Implementation

Write web access

 

2019-07-27
Here's version 1; any feedback is appreciated.
Regards,
Murali

#!/usr/bin/env python

"""
Sample for posting to the calibration service using a web service and kerberos authentication.
Make sure we have a kerberos ticket.
"""

import requests
import json
from krtc import KerberosTicket
from urllib.parse import urlparse

ws_url = "https://pswww.slac.stanford.edu/ws-kerb/calib_ws/"
krbheaders = KerberosTicket("HTTP@" + urlparse(ws_url).hostname).getAuthHeaders()

# Create a new document in the collection test_coll in the database test_db.
resp = requests.post(ws_url+"test_db/test_coll/", headers=krbheaders, json={"calib_count": 1})
print(resp.text)
new_id = resp.json()["_id"]

# Update an existing document
resp = requests.put(ws_url+"test_db/test_coll/"+new_id, headers=krbheaders, json={"calib_count": 2})
print(resp.text)

# Delete an existing document
resp = requests.delete(ws_url+"test_db/test_coll/"+new_id, headers=krbheaders)
print(resp.text)

# Create a new GridFS document, we upload an image called small_img.png
files = [("files",  ('small_img.png', open('small_img.png', 'rb'), 'image/png'))]
resp = requests.post(ws_url+"test_db/gridfs/", headers=krbheaders, files=files)
print(resp.text)
new_id = resp.json()["_id"]

# Delete the GridFS document
resp = requests.delete(ws_url+"test_db/gridfs/"+new_id, headers=krbheaders)
print(resp.text)

 

Summary

References