event package - github.com/AliceO2Group/Control/common/event - Go Packages

Package event provides event publishing and streaming functionality for O² Control components, supporting Kafka and dummy event writers.

This section is empty.

View Source

var (
	KAFKAWRITER  = "kafka_writer"
	KAFKAPREPARE = "kafka_prepare"
)

This section is empty.

type AgentFailedEvent struct {
	AgentId mesos.AgentID `json:"agentId"`
	
}
func (e *AgentFailedEvent) GetTimestamp() string
func (e *AgentFailedEvent) SetLabels(labels map[string]string)
type AnnounceTaskPIDEvent struct {
	TaskId string `json:"taskId"`
	PID    int32  `json:"pid"`
	
}
func (e *AnnounceTaskPIDEvent) GetLabels() map[string]string
func (e *AnnounceTaskPIDEvent) GetTaskPID() int
func (e *AnnounceTaskPIDEvent) GetTimestamp() string
func (e *AnnounceTaskPIDEvent) SetLabels(labels map[string]string)
type BasicTaskTerminated struct {
	DeviceEventBase
	ExitCode             int             `json:"exitCode"`
	Stdout               string          `json:"stdout"`
	Stderr               string          `json:"stderr"`
	VoluntaryTermination bool            `json:"voluntaryTermination"`
	FinalMesosState      mesos.TaskState `json:"finalMesosState"`
}
func (e *BasicTaskTerminated) GetTimestamp() string
func (e *BasicTaskTerminated) SetLabels(labels map[string]string)
type DeviceEvent interface {
	Event
	GetOrigin() DeviceEventOrigin
	GetType() pb.DeviceEventType
}
func NewDeviceEvent(origin DeviceEventOrigin, t pb.DeviceEventType) (de DeviceEvent)
type DeviceEventBase struct {
	Type   pb.DeviceEventType `json:"type"`
	Origin DeviceEventOrigin  `json:"origin"`
	
}
func (b *DeviceEventBase) GetOrigin() DeviceEventOrigin
func (e *DeviceEventBase) GetTimestamp() string
func (b *DeviceEventBase) GetType() pb.DeviceEventType
func (e *DeviceEventBase) SetLabels(labels map[string]string)
type DeviceEventOrigin struct {
	
	AgentId    mesos.AgentID    `json:"agentId"`
	ExecutorId mesos.ExecutorID `json:"executorId"`
	TaskId     mesos.TaskID     `json:"taskId"`
}
type DummyReader struct{}

DummyReader is an implementation of Reader that returns no events.

type DummyWriter struct{}
func (*DummyWriter) Close()
func (*DummyWriter) WriteEvent(interface{})
func (*DummyWriter) WriteEventWithTimestamp(interface{}, time.Time)
type EndOfStream struct {
	DeviceEventBase
}
func (e *EndOfStream) GetTimestamp() string
func (e *EnvironmentEvent) GetTimestamp() string
func (e *EnvironmentEvent) SetLabels(labels map[string]string)
type ExecutorFailedEvent struct {
	ExecutorId mesos.ExecutorID `json:"executorId"`
	
}
func (e *ExecutorFailedEvent) GetTimestamp() string
func (e *ExecutorFailedEvent) SetLabels(labels map[string]string)
type FifoBuffer[T any] struct {
	
}

This structure is meant to be used as a threadsafe FIFO with builtin waiting for new data in its Pop and PopMultiple functions. It is meant to be used with multiple goroutines, it is a waste of synchronization mechanisms if used synchronously.

func NewFifoBuffer[T any]() (result FifoBuffer[T])
func (this *FifoBuffer[T]) Length() int
func (this *FifoBuffer[T]) PopMultiple(numberToPop uint) (result []T)

Blocks until it has some value in internal buffer

func (this *FifoBuffer[T]) Push(value T)
func (this *FifoBuffer[T]) ReleaseGoroutines()
type HasEnvID interface {
	GetEnvironmentId() string
}
type IntegratedServiceEvent interface {
	Event
	GetServiceName() string
}
type IntegratedServiceEventBase struct {
	ServiceName string `json:"serviceName"`
	
}
func (e *IntegratedServiceEventBase) GetLabels() map[string]string
func (e *IntegratedServiceEventBase) GetTimestamp() string
func (e *IntegratedServiceEventBase) SetLabels(labels map[string]string)

KafkaReader reads events from Kafka and provides a blocking, cancellable API to fetch events.

NewReaderWithTopic creates a KafkaReader for the provided topic and starts it.

Last fetches the last available message on the topic (considering all partitions). If multiple partitions have data, the event with the greatest message timestamp is returned.

Next blocks until the next event is available or ctx is cancelled.

Kafka writer is used to convert events from events.proto into kafka messages and to write them. it is built with 2 workers:

#1 is gathering kafka.Message from any goroutine which sends message into buffered channel and puts them into FifoBuffer.
#2 is poping any messages from FifoBuffer and sends them to Kafka

The reason for this setup over setting Async: true in kafka.Writer is the ability to have some error handling
of failed messages. Moreover if we used only one worker that gathers messages from channel and then sends them directly to Kafka,
we would block whole core if we receive lot of messages at once. So we split functionality into two workers: one is
putting all messages into the buffer, so if we have a lot of messages buffer just grows without blocking whole core and the
second does all the sending. This setup allows us to gather messages from any amount of goroutines without blocking/losing messages.
Another benefit is batching messages instead of writing them one by one.
func (w *KafkaWriter) Close()
func (w *KafkaWriter) WriteEvent(e interface{})
func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time)

Reader interface provides methods to read events.

func (e *RoleEvent) GetTimestamp() string
func (e *TaskEvent) GetTimestamp() string
type TaskInternalError struct {
	DeviceEventBase
}
func (e *TaskInternalError) GetTimestamp() string
func (e *TaskInternalError) SetLabels(labels map[string]string)
type TasksReleasedEvent struct {
	EnvironmentId     uid.ID           `json:"environmentId"`
	TaskIdsReleased   []string         `json:"taskIdsReleased"`
	TaskReleaseErrors map[string]error `json:"taskReleaseErrors"`
	
}
func (tr *TasksReleasedEvent) GetEnvironmentId() uid.ID
func (e *TasksReleasedEvent) GetTimestamp() string
func (e *TasksReleasedEvent) SetLabels(labels map[string]string)
type TasksStateChangedEvent struct {
	EnvironmentId       uid.ID   `json:"environmentId"`
	TaskIdsStateChanged []string `json:"taskIdsStatesChanged"`
	TaskStateChangedErr error    `json:"taskStateChangedErr"`
	
}
func NewTasksStateChangedEvent(envId uid.ID, taskIdsChangeState []string, taskStateChangedErr error) (tr *TasksStateChangedEvent)
func (tr *TasksStateChangedEvent) GetEnvironmentId() uid.ID
func (e *TasksStateChangedEvent) GetLabels() map[string]string
func (tr *TasksStateChangedEvent) GetTasksStateChangedError() error
func (e *TasksStateChangedEvent) GetTimestamp() string
func (e *TasksStateChangedEvent) SetLabels(labels map[string]string)
type Writer interface {
	WriteEvent(e interface{})
	WriteEventWithTimestamp(e interface{}, timestamp time.Time)
	Close()
}