"""Module to handle a job."""
# Copyright 2017 Qarnot computing
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import datetime
from .task import Task
from . import get_url, raise_on_error, _util
from .exceptions import MaxJobException, NotEnoughCreditsException, MissingJobException, UnauthorizedException
class JobState:
Active = "Active"
Terminating = "Terminating"
Completed = "Completed"
Deleting = "Deleting"
[docs]
class Job(object):
"""Represents a Qarnot job.
.. note::
A :class:`Job` must be created with
:meth:`qarnot.connection.Connection.create_job`
or retrieved with :meth:`qarnot.connection.Connection.jobs` or :meth:`qarnot.connection.Connection.retrieve_job`.
"""
[docs]
def __init__(self, connection, name, pool=None, shortname=None, use_dependencies=False):
"""Create a new :class:`Job`.
:param connection: the cluster on which to send the job
:type connection: :class:`~qarnot.connection.Connection`
:param name: given name of the job
:type name: :class:`str`
:param pool: which Pool to submit the job in,
:type pool: :class:`~qarnot.pool.Pool` or None
:param shortname: userfriendly job name
:type shortname: :class:`str`
:param use_dependencies: allow dependencies between tasks in this job
:type use_dependencies: :class:`bool`
:param logger: which job to attach the task to
:type logger: :class:`logging.Logger`
"""
self._connection = connection
self._name = name
self._shortname = shortname
self._pool_uuid = None
if pool is not None:
if _util.is_string(pool):
self._pool_uuid = pool
else:
self._pool_uuid = pool.uuid
self._state = ""
self._uuid = None
self._creation_date = datetime.datetime.now()
self._use_dependencies = use_dependencies
self._max_wall_time = None
self._update_cache_time = 5
self._auto_update = True
self._last_auto_update_state = self._auto_update
self._tags = []
self._last_modified = None
self._last_cache = time.time()
self._completion_time_to_live = "00:00:00"
self._auto_delete = False
self._previous_state = None
self._state_transition_time = None
self._previous_state_transition_time = None
@property
def auto_update(self):
""":type: :class:`bool`
:getter: Returns this job's auto update state
:setter: Sets this job's auto update state
Auto update state, default to True
When auto update is disabled properties will always return cached value
for the object and a call to :meth:`update` will be required to get latest values from the REST Api.
"""
return self._auto_update
@auto_update.setter
def auto_update(self, value):
"""Setter for auto_update feature
"""
self._auto_update = value
self._last_auto_update_state = self._auto_update
@property
def update_cache_time(self):
""":type: :class:`int`
:getter: Returns this job's auto update state
:setter: Sets this job's auto update state
Cache expiration time, default to 5s
"""
return self._update_cache_time
@property
def state(self):
""":type: :class:`str`
:getter: return this job's state
State of the job.
Value is in
* UnSubmitted
* Active,
* Terminating,
* Completed,
* Deleting
.. warning::
this is the state of the job when the object was retrieved,
call :meth:`update` for up to date value.
"""
if self._auto_update:
self.update()
return self._state
@property
def tasks(self):
""":type: List of :class:`~qarnot.task.Task`
:getter: Returns this job tasks
The tasks submitted in this job.
.. note:: The tasks need to be actually submitted with :meth:`.Task.submit` to be recognized and displayed by the method :meth:`~qarnot.job.Job.tasks`
"""
if self._uuid is None:
return
response = self._connection._get(get_url('job tasks', uuid=self._uuid))
if response.status_code == 404:
raise MissingJobException(_util.get_error_message_from_http_response(response))
raise_on_error(response)
return [Task.from_json(self, task, True) for task in response.json()]
@property
def use_dependencies(self):
""":type: :class:`bool`
:getter: task's job can have dependencies
:setter: Set if there is task's job dependencies
Can be set until :meth:`submit` is called.
"""
return self._use_dependencies
@use_dependencies.setter
def use_dependencies(self, value):
"""setter for job"""
if self._uuid is not None:
raise AttributeError("can't set attribute on a submitted job")
else:
self._use_dependencies = value
@property
def uuid(self):
""":type: :class:`str`
:getter: Returns this job's uuid
The job's uuid.
Automatically set when a job is submitted.
"""
return self._uuid
@property
def name(self):
""":type: :class:`str`
:getter: Returns this job's name
:setter: Sets this job's name
The job's name.
Can be set until job is submitted.
"""
return self._name
@name.setter
def name(self, value):
"""Setter for name."""
if self._uuid is not None:
raise AttributeError("can't set attribute on a submitted job")
else:
self._name = value
@property
def shortname(self):
""":type: :class:`str`
:getter: Returns this job's shortname
:setter: Sets this job's shortname
The job's shortname, must be DNS compliant and unique, if not provided, will default to :attr:`uuid`.
Can be set until job is submitted.
"""
return self._shortname
@shortname.setter
def shortname(self, value):
"""Setter for shortname."""
if self._uuid is not None:
raise AttributeError("can't set attribute on a submitted job")
else:
self._shortname = value
@property
def creation_date(self):
""":type: :class:`str`
:getter: Returns this job's creation date
Creation date of the job (UTC Time)
"""
return self._creation_date
@property
def max_wall_time(self):
""":type: :class:`str`
:getter: Returns this job's maximum wall time
:setter: Sets this job's maximum wall time
The job's maximum wall time.
It is a time span string.
Format example: ``d.hh:mm:ss`` or ``hh:mm:ss``
Can be set until job is submitted.
"""
return self._max_wall_time
@max_wall_time.setter
def max_wall_time(self, value):
"""Setter for maximum wall time. In time span format example : ``d.hh:mm:ss`` or ``hh:mm:ss`` """
if self._uuid is not None:
raise AttributeError("can't set attribute on a submitted job")
elif _util.is_string(value):
self._max_wall_time = value
elif isinstance(value, datetime.timedelta):
self._max_wall_time = _util.convert_timedelta_to_timespan_string(value)
else:
raise TypeError("Maximum wall time must be a time span format string (example: ``d.hh:mm:ss`` or ``hh:mm:ss`` )")
@property
def pool(self):
""":type: :class:`~qarnot.pool.Pool`
:getter: Returns this job's pool
:setter: Sets this job's pool
The pool to run the job in.
Can be set until :meth:`submit` is called.
"""
return self._connection.retrieve_pool(self._pool_uuid)
@pool.setter
def pool(self, value):
"""setter for pool"""
if self._uuid is not None:
raise AttributeError("can't set attribute on a submitted job")
else:
self._pool_uuid = value.uuid
@staticmethod
def _retrieve(connection, uuid):
"""Retrieve a submitted job given its uuid.
:param qarnot.connection.Connection connection:
the cluster to retrieve the job from
:param str uuid: the uuid of the job to retrieve
:rtype: Job
:returns: The retrieved job.
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.MissingJobException: no such job
"""
resp = connection._get(get_url('job update', uuid=uuid))
if resp.status_code == 404:
raise MissingJobException(_util.get_error_message_from_http_response(resp))
raise_on_error(resp)
return Job.from_json(connection, resp.json())
[docs]
@classmethod
def from_json(cls, connection, payload):
"""Create a Job object from a json job.
:param qarnot.connection.Connection connection: the cluster connection
:param dict json_job: Dictionary representing the job
:returns: The created :class:`~qarnot.job.Job`.
"""
job = cls(connection,
payload.get("name"),
payload.get("poolUuid"),
payload.get("shortname"),
payload.get("useDependencies"))
job._uuid = payload.get("uuid")
job._state = payload.get("state")
job._creation_date = payload.get("creationDate")
job._last_modified = payload.get("lastModified")
job._max_wall_time = _util.parse_timedelta(payload.get("maxWallTime"))
job._auto_delete = payload.get("autoDeleteOnCompletion")
job._completion_time_to_live = payload.get("completionTimeToLive")
return job
def _to_json(self):
"""Get a dict ready to be json packed from this task."""
json_job = {
'name': self._name,
'poolUuid': self._pool_uuid,
'shortname': self._shortname,
'state': self._state,
'tags': self._tags,
'useDependencies': self._use_dependencies,
'maxWallTime': self._max_wall_time,
'autoDeleteOnCompletion': self._auto_delete,
'completionTimeToLive': self._completion_time_to_live
}
return json_job
def _update(self, json_job):
"""Update this job from retrieved info."""
self._uuid = json_job.get('uuid')
self._name = json_job.get('name')
self._shortname = json_job.get('shortname')
self._pool_uuid = json_job.get('poolUuid')
self._use_dependencies = json_job.get('useDependencies')
self._state = json_job.get('state')
self._creation_date = _util.parse_datetime(json_job.get('creationDate'))
self._last_modified = json_job.get('lastModified')
self._max_wall_time = json_job.get('maxWallTime')
self._tags = json_job.get('tags', None)
self._previous_state = json_job.get('previousState', None)
self._state_transition_time = json_job.get('stateTransitionTime', None)
self._previous_state_transition_time = json_job.get('previousStateTransitionTime', None)
[docs]
def submit(self):
"""Submit job to the cluster if it is not already submitted.
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises ~qarnot.exceptions.MaxJobException: Job quota reached
:raises ~qarnot.exceptions.NotEnoughCreditsException: Not enough credits
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
"""
if self._uuid is not None and self._uuid != "":
return self._state
payload = self._to_json()
resp = self._connection._post(get_url('jobs'), json=payload)
if resp.status_code == 404:
raise MissingJobException(_util.get_error_message_from_http_response(resp))
elif resp.status_code == 403:
error_message = _util.get_error_message_from_http_response(resp)
if "maximum number of jobs reached" in error_message.lower():
raise MaxJobException(error_message)
raise UnauthorizedException(error_message)
elif resp.status_code == 402:
raise NotEnoughCreditsException(_util.get_error_message_from_http_response(resp))
raise_on_error(resp)
self._uuid = resp.json().get('uuid')
self.update(True)
[docs]
def update(self, flushcache=False):
"""
Update the job object from the REST Api.
The flushcache parameter can be used to force the update, otherwise a cached version of the object
will be served when accessing properties of the object.
Cache behavior is configurable with :attr:`auto_update` and :attr:`update_cache_time`.
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.MissingJobException: job does not exist
"""
if self._uuid is None:
return
now = time.time()
if (now - self._last_cache) < self._update_cache_time and not flushcache:
return
resp = self._connection._get(
get_url('job update', uuid=self._uuid))
if resp.status_code == 404:
raise MissingJobException(_util.get_error_message_from_http_response(resp))
raise_on_error(resp)
self._update(resp.json())
self._last_cache = time.time()
[docs]
def terminate(self):
"""Terminate this job on the server and abort all remaining tasks in the job.
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.MissingJobException: job does not exist
"""
if self._uuid is None:
return
resp = self._connection._post(get_url('job terminate', uuid=self._uuid))
if resp.status_code == 404:
raise MissingJobException(_util.get_error_message_from_http_response(resp))
raise_on_error(resp)
self._state = JobState.Terminating
[docs]
def delete(self, forceAbort=False):
"""Delete this job on the server.
The forceAbort parameter can be used to force running task in the job to be aborted,
:raises ~qarnot.exceptions.QarnotGenericException: API general error, see message for details
:raises ~qarnot.exceptions.UnauthorizedException: invalid credentials
:raises ~qarnot.exceptions.UnauthorizedException: job still contains running tasks
:raises ~qarnot.exceptions.MissingJobException: job does not exist
"""
if self._uuid is None:
return
resp = self._connection._delete(get_url('job delete', uuid=self._uuid, force=forceAbort))
if resp.status_code == 404:
raise MissingJobException(_util.get_error_message_from_http_response(resp))
elif resp.status_code == 403:
raise UnauthorizedException(_util.get_error_message_from_http_response(resp))
raise_on_error(resp)
self._state = JobState.Deleting
self._uuid = None
@property
def auto_delete(self):
"""Autodelete this job if it is finished and your max number of job is reach
Can be set until :meth:`submit` is called.
:type: :class:`bool`
:getter: Returns is this job must autodelete
:setter: Sets this job's autodelete
:default_value: "False"
:raises AttributeError: if you try to reset the auto_delete after the job is submit
"""
return self._auto_delete
@auto_delete.setter
def auto_delete(self, value):
"""Setter for auto_delete, this can only be set before job's submission
"""
if self.uuid is not None:
raise AttributeError("can't set attribute on a launched job")
self._auto_delete = value
@property
def completion_ttl(self):
"""The job will be auto delete `completion_ttl` after it is finished
Can be set until :meth:`submit` is called.
:getter: Returns this job's completed time to live.
:type: :class:`str`
:setter: Sets this job's this job's completed time to live.
:type: :class:`str` or :class:`datetime.timedelta`
:default_value: ""
:raises AttributeError: if you try to set it after the job is submitted
The `completion_ttl` must be a timedelta or a time span format string (example: ``d.hh:mm:ss`` or ``hh:mm:ss`` )
"""
return self._completion_time_to_live
@completion_ttl.setter
def completion_ttl(self, value):
"""Setter for completion_ttl, this can only be set before job's submission"""
if self._uuid is not None:
raise AttributeError("can't set attribute on a submitted job")
self._completion_time_to_live = _util.parse_to_timespan_string(value)
@property
def tags(self):
""":type: :class:list(`str`)
:getter: Returns this job's tags
:setter: Sets this job's tags
Custom tags.
"""
if self._auto_update:
self.update()
return self._tags
@tags.setter
def tags(self, value):
"""Setter for tags"""
if self._auto_update:
self.update()
self._tags = value
@property
def previous_state(self):
"""
:type: :class:`str`
:getter: Returns the running job's previous state
"""
return self._previous_state
@property
def state_transition_time(self):
"""
:type: :class:`str`
:getter: Returns the running job's transition state time
job state transition time (UTC Time)
"""
return self._state_transition_time
@property
def previous_state_transition_time(self):
"""
:type: :class:`str`
:getter: Returns the running job's previous transition state time
job previous state transition time (UTC Time)
"""
return self._previous_state_transition_time
def __repr__(self):
return '{0} - {1} - {2} - Pool : {3} - {4} - Tags: {5} - UseDependencies : {6} '\
.format(self.name,
self.shortname,
self._uuid,
self._pool_uuid,
self.state,
self._tags,
self._use_dependencies)